Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/950#discussion_r141215109
--- Diff: contrib/native/client/src/clientlib/channel.cpp ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/lexical_cast.hpp>
+#include <boost/regex.hpp>
+
+#include "drill/drillConfig.hpp"
+#include "drill/drillError.hpp"
+#include "drill/userProperties.hpp"
+#include "channel.hpp"
+#include "errmsgs.hpp"
+#include "logger.hpp"
+#include "utils.hpp"
+#include "zookeeperClient.hpp"
+
+#include "GeneralRPC.pb.h"
+
+namespace Drill{
+
+ConnectionEndpoint::ConnectionEndpoint(const char* connStr){
+ m_connectString=connStr;
+ m_pError=NULL;
+}
+
+ConnectionEndpoint::ConnectionEndpoint(const char* host, const char* port){
+ m_host=host;
+ m_port=port;
+ m_protocol="drillbit"; // direct connection
+ m_pError=NULL;
+}
+
+ConnectionEndpoint::~ConnectionEndpoint(){
+ if(m_pError!=NULL){
+ delete m_pError; m_pError=NULL;
+ }
+}
+
+connectionStatus_t ConnectionEndpoint::getDrillbitEndpoint(){
+ connectionStatus_t ret=CONN_SUCCESS;
+ if(!m_connectString.empty()){
+ parseConnectString();
+ if(m_protocol.empty()){
+ return handleError(CONN_INVALID_INPUT,
getMessage(ERR_CONN_UNKPROTO, "<invalid_string>"));
+ }
+ if(isZookeeperConnection()){
+ if((ret=getDrillbitEndpointFromZk())!=CONN_SUCCESS){
+ return ret;
+ }
+ }else if(!this->isDirectConnection()){
+ return handleError(CONN_INVALID_INPUT,
getMessage(ERR_CONN_UNKPROTO, this->getProtocol().c_str()));
+ }
+ }else{
+ if(m_host.empty() || m_port.empty()){
+ return handleError(CONN_INVALID_INPUT,
getMessage(ERR_CONN_NOCONNSTR));
+ }
+ }
+ return ret;
+}
+
+void ConnectionEndpoint::parseConnectString(){
+ boost::regex connStrExpr("(.*)=(.*):([0-9]+)(?:/(.+))?");
+ boost::cmatch matched;
+
+ if(boost::regex_match(m_connectString.c_str(), matched, connStrExpr)){
+ m_protocol.assign(matched[1].first, matched[1].second);
+ std::string host, port;
+ host.assign(matched[2].first, matched[2].second);
+ port.assign(matched[3].first, matched[3].second);
+ if(isDirectConnection()){
+ // if the connection is to a zookeeper,
+ // we will get the host and the port only after connecting to
the Zookeeper
+ m_host=host;
+ m_port=port;
+ }
+ m_hostPortStr=host+std::string(":")+port;
+ std::string pathToDrill;
+ if(matched.size()==5){
+ pathToDrill.assign(matched[4].first, matched[4].second);
+ if(!pathToDrill.empty()){
+ m_pathToDrill=std::string("/")+pathToDrill;
+ }
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
+ << "Conn str: "<< m_connectString
+ << "; protocol: " << m_protocol
+ << "; host: " << host
+ << "; port: " << port
+ << "; path to drill: " << m_pathToDrill
+ << std::endl;)
+ } else {
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Invalid connect string.
Regexp did not match" << std::endl;)
+ }
+
+ return;
+}
+
+bool ConnectionEndpoint::isDirectConnection(){
+ assert(!m_protocol.empty());
+ return (!strcmp(m_protocol.c_str(), "local") ||
!strcmp(m_protocol.c_str(), "drillbit"));
+}
+
+bool ConnectionEndpoint::isZookeeperConnection(){
+ assert(!m_protocol.empty());
+ return (!strcmp(m_protocol.c_str(), "zk"));
+}
+
+connectionStatus_t ConnectionEndpoint::getDrillbitEndpointFromZk(){
+ ZookeeperClient zook(m_pathToDrill);
+ assert(!m_hostPortStr.empty());
+ std::vector<std::string> drillbits;
+ if(zook.getAllDrillbits(m_hostPortStr.c_str(), drillbits)!=0){
+ return handleError(CONN_ZOOKEEPER_ERROR,
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ if (drillbits.empty()){
+ return handleError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
+ }
+ Utils::shuffle(drillbits);
+ exec::DrillbitEndpoint endpoint;
+ int err = zook.getEndPoint(drillbits[drillbits.size() -1],
endpoint);// get the last one in the list
+ if(!err){
+ m_host=boost::lexical_cast<std::string>(endpoint.address());
+ m_port=boost::lexical_cast<std::string>(endpoint.user_port());
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<
(drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() <<
std::endl;)
--- End diff --
We should return proper error code in case of any error while fetching the
Drillbit Endpoint. This was handled in older code as well.
```
if(err){
return handleConnError(CONN_ZOOKEEPER_ERROR,
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
}
```
---