http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc deleted file mode 100644 index c3cbf7e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "datatransfer.h" - -#include "hdfspp/status.h" - -namespace hdfs { - -namespace DataTransferSaslStreamUtil { - -static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS; - -using hadoop::hdfs::DataTransferEncryptorMessageProto; - -Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) { - using namespace hadoop::hdfs; - auto s = msg->status(); - if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) { - payload->clear(); - return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str()); - } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) { - payload->clear(); - return Status::Error(msg->message().c_str()); - } else { - *payload = msg->payload(); - return Status::OK(); - } -} - -void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) { - msg->set_status(kSUCCESS); - msg->set_payload(""); -} - -} -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h deleted file mode 100644 index 93103c5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_READER_DATA_TRANSFER_H_ -#define LIB_READER_DATA_TRANSFER_H_ - -#include "common/sasl_authenticator.h" -#include "common/async_stream.h" -#include "connection/datanodeconnection.h" -#include <memory> - - -namespace hdfs { - -enum { - kDataTransferVersion = 28, - kDataTransferSasl = 0xdeadbeef, -}; - -enum Operation { - kWriteBlock = 80, - kReadBlock = 81, -}; - -template <class Stream> class DataTransferSaslStream : public DataNodeConnection { -public: - DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username, - const std::string &password) - : stream_(stream), authenticator_(username, password) {} - - template <class Handler> void Handshake(const Handler &next); - - void async_read_some(const MutableBuffers &buf, - std::function<void (const asio::error_code & error, - std::size_t bytes_transferred) > handler) override { - stream_->async_read_some(buf, handler); - } - - void async_write_some(const ConstBuffers &buf, - std::function<void (const asio::error_code & error, - std::size_t bytes_transferred) > handler) override { - stream_->async_write_some(buf, handler); - } - - void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override - {(void)handler; /*TODO: Handshaking goes here*/}; - - void Cancel(); -private: - DataTransferSaslStream(const DataTransferSaslStream &) = delete; - DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; - std::shared_ptr<Stream> stream_; - DigestMD5Authenticator authenticator_; - struct ReadSaslMessage; - struct Authenticator; -}; -} - -#include "datatransfer_impl.h" - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h deleted file mode 100644 index 77e618d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_READER_DATATRANFER_IMPL_H_ -#define LIB_READER_DATATRANFER_IMPL_H_ - -#include "datatransfer.pb.h" -#include "common/continuation/continuation.h" -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <asio/read.hpp> -#include <asio/buffer.hpp> - -namespace hdfs { - -namespace DataTransferSaslStreamUtil { -Status -ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg, - std::string *payload); -void PrepareInitialHandshake( - ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg); -} - -template <class Stream> -struct DataTransferSaslStream<Stream>::Authenticator - : continuation::Continuation { - Authenticator(DigestMD5Authenticator *authenticator, - const std::string *request, - hadoop::hdfs::DataTransferEncryptorMessageProto *msg) - : authenticator_(authenticator), request_(request), msg_(msg) {} - - virtual void Run(const Next &next) override { - using namespace ::hadoop::hdfs; - std::string response; - Status status = authenticator_->EvaluateResponse(*request_, &response); - msg_->Clear(); - if (status.ok()) { - // TODO: Handle encryption scheme - msg_->set_payload(response); - msg_->set_status( - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - } else { - msg_->set_status( - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR); - } - next(Status::OK()); - } - -private: - DigestMD5Authenticator *authenticator_; - const std::string *request_; - hadoop::hdfs::DataTransferEncryptorMessageProto *msg_; -}; - -template <class Stream> -struct DataTransferSaslStream<Stream>::ReadSaslMessage - : continuation::Continuation { - ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data) - : stream_(stream), data_(data), read_pb_(stream, &resp_) {} - - virtual void Run(const Next &next) override { - auto handler = [this, next](const Status &status) { - if (status.ok()) { - Status new_stat = - DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_); - next(new_stat); - } else { - next(status); - } - }; - read_pb_.Run(handler); - } - -private: - std::shared_ptr<Stream> stream_; - std::string *data_; - hadoop::hdfs::DataTransferEncryptorMessageProto resp_; - continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_; -}; - -template <class Stream> -template <class Handler> -void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { - using ::hadoop::hdfs::DataTransferEncryptorMessageProto; - using ::hdfs::asio_continuation::Write; - using ::hdfs::continuation::WriteDelimitedPBMessage; - - static const int kMagicNumber = htonl(kDataTransferSasl); - static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer( - reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber)); - - struct State { - DataTransferEncryptorMessageProto req0; - std::string resp0; - DataTransferEncryptorMessageProto req1; - std::string resp1; - std::shared_ptr<Stream> stream; - }; - auto m = continuation::Pipeline<State>::Create(); - State *s = &m->state(); - s->stream = stream_; - - DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); - - m->Push(Write(stream_, kMagicNumberBuffer)) - .Push(WriteDelimitedPBMessage(stream_, &s->req0)) - .Push(new ReadSaslMessage(stream_, &s->resp0)) - .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) - .Push(WriteDelimitedPBMessage(stream_, &s->req1)) - .Push(new ReadSaslMessage(stream_, &s->resp1)); - m->Run([next](const Status &status, const State &) { next(status); }); -} - -template <class Stream> -void DataTransferSaslStream<Stream>::Cancel() { - /* implement with secured reads */ -} - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h deleted file mode 100644 index 2fb42a6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_READER_FILEINFO_H_ -#define LIB_READER_FILEINFO_H_ - -#include "ClientNamenodeProtocol.pb.h" - -namespace hdfs { - -/** - * Information that is assumed to be unchanging about a file for the duration of - * the operations. - */ -struct FileInfo { - unsigned long long file_length_; - bool under_construction_; - bool last_block_complete_; - std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc deleted file mode 100644 index a64800a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "readergroup.h" - -#include <algorithm> - -namespace hdfs { - -void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) { - std::lock_guard<std::recursive_mutex> state_lock(state_lock_); - ClearDeadReaders(); - std::weak_ptr<BlockReader> weak_ref = reader; - readers_.push_back(weak_ref); -} - -std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() { - std::lock_guard<std::recursive_mutex> state_lock(state_lock_); - - std::vector<std::shared_ptr<BlockReader>> live_readers; - for(auto it=readers_.begin(); it != readers_.end(); it++) { - std::shared_ptr<BlockReader> live_reader = it->lock(); - if(live_reader) { - live_readers.push_back(live_reader); - } - } - return live_readers; -} - -void ReaderGroup::ClearDeadReaders() { - std::lock_guard<std::recursive_mutex> state_lock(state_lock_); - - auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) { - return ptr.expired(); - }; - - auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead); - readers_.erase(it, readers_.end()); -} - -} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h deleted file mode 100644 index e6173f7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef READER_READER_GROUP_H_ -#define READER_READER_GROUP_H_ - -#include "block_reader.h" - -#include <memory> -#include <vector> -#include <mutex> - -namespace hdfs { - -/** - * Provide a way of logically grouping ephemeral block readers - * so that their status can be monitored or changed. - * - * Note: This does not attempt to extend the reader life - * cycle. Readers are assumed to be owned by something else - * using a shared_ptr. - **/ - -class ReaderGroup { - public: - ReaderGroup() {}; - void AddReader(std::shared_ptr<BlockReader> reader); - /* find live readers, promote to shared_ptr */ - std::vector<std::shared_ptr<BlockReader>> GetLiveReaders(); - private: - /* remove weak_ptrs that don't point to live object */ - void ClearDeadReaders(); - std::recursive_mutex state_lock_; - std::vector<std::weak_ptr<BlockReader>> readers_; -}; - -} // end namespace hdfs -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt deleted file mode 100644 index e5a26fb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc) -if (CMAKE_USING_CYRUS_SASL) - list(APPEND rpc_object_items cyrus_sasl_engine.cc) -endif (CMAKE_USING_CYRUS_SASL) -if (CMAKE_USING_GSASL) - list(APPEND rpc_object_items gsasl_engine.cc) -endif (CMAKE_USING_GSASL) - -add_library(rpc_obj OBJECT ${rpc_object_items}) - - -add_dependencies(rpc_obj proto) -add_library(rpc $<TARGET_OBJECTS:rpc_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc deleted file mode 100644 index 5c96ede..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc +++ /dev/null @@ -1,469 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfspp/locks.h" - -#include <sys/types.h> -#include "sasl/sasl.h" -#include "sasl/saslutil.h" -#include <string.h> -#include <string> -#include <sstream> -#include <unistd.h> // getpass() ( deprecated) - -#include "common/logging.h" - -#include "sasl_engine.h" -#include "cyrus_sasl_engine.h" - -namespace hdfs { - -static Mutex *getSaslMutex() { - return LockManager::getGssapiMutex(); -} - -// Forward decls of sasl callback functions -typedef int (*sasl_callback_ft)(void); -int get_name(void *context, - int id, - const char **result, - unsigned *len); - -int getrealm(void *context, - int id, - const char **availrealms, - const char **result); - -// This should be constructed once per process, and destroyed once per process - -class CyrusPerProcessData -{ -public: - static Status Init(); // Can be called many times -private: - CyrusPerProcessData(); - ~CyrusPerProcessData(); - Status init_status_; - - static CyrusPerProcessData & GetInstance(); -}; - - -/***************************************************************************** - * CYRUS UTILITY FUNCTIONS - */ - -// Cyrus-specific error messages: -// errStr() is the non-method version, to -// be called by utility routines. -std::string errStr( int rc) { - switch (rc) { - case SASL_NOTINIT: /* -12 */ return "SASL library not initialized"; - case SASL_WRONGMECH:/* -11 */ return "mechanism doesn't support requested feature"; - case SASL_BADSERV: /* -10 */ return "server failed mutual authentication step"; - case SASL_BADMAC: /* -9 */ return "integrity check failed"; - case SASL_TRYAGAIN: /* -8 */ return "transient failure (e.g., weak key)"; - case SASL_BADPARAM: /* -7 */ return "invalid parameter supplied"; - case SASL_NOTDONE: /* -6 */ return "can't request info until later in exchange"; - case SASL_BADPROT: /* -5 */ return "bad protocol / cancel"; - case SASL_NOMECH: /* -4 */ return "mechanism not supported"; - case SASL_BUFOVER: /* -3 */ return "overflowed buffer"; - case SASL_NOMEM: /* -2 */ return "memory shortage failure"; - case SASL_FAIL: /* -1 */ return "generic failure"; - case SASL_OK: /* 0 */ return "successful result"; - case SASL_CONTINUE: /* 1 */ return "another step is needed in authentication"; - case SASL_INTERACT: /* 2 */ return "needs user interaction"; - default: return "unknown error"; - } // switch(rc) -} // errStr() - -Status make_status(int rc) { - if (rc != SASL_OK && - rc != SASL_CONTINUE && - rc != SASL_INTERACT) { - return Status::AuthenticationFailed(errStr(rc).c_str()); - } - return Status::OK(); -} - -// SaslError() method: Use this when a method needs -// to update the engine's state. -Status CySaslEngine::SaslError( int rc) { - Status status = make_status(rc); - if (!status.ok()) - state_ = kErrorState; - - return status; -} - - -/***************************************************************************** -* Cyrus SASL ENGINE -*/ - -CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr) -{ - // Create an array of callbacks that embed a pointer to this - // so we can call methods of the engine - per_connection_callbacks_ = { - { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ - { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT - { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm - // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this - { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} - }; -} - -// Cleanup of last resort. Call Finish to allow a safer check on disposal -CySaslEngine::~CySaslEngine() -{ - - if (conn_) { - try { - LockGuard saslGuard(getSaslMutex()); - sasl_dispose( &conn_); // undo sasl_client_new() - } catch (const LockFailure& e) { - LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << e.what()); - } - } -} // destructor - -// initialize some cyrus sasl context stuff: - -Status CySaslEngine::InitCyrusSasl() -{ - int rc = SASL_OK; - - // set up some callbacks once per process: - Status init_status = CyrusPerProcessData::Init(); - if (!init_status.ok()) - return init_status; - - // Initialize the sasl_li brary with per-connection configuration: - const char * fqdn = chosen_mech_.serverid.c_str(); - const char * proto = chosen_mech_.protocol.c_str(); - - try { - LockGuard saslGuard(getSaslMutex()); - rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_); - if (rc != SASL_OK) { - return SaslError(rc); - } - } catch (const LockFailure& e) { - return Status::MutexError("mutex that guards sasl_client_new unable to lock"); - } - - return Status::OK(); -} // cysasl_new() - -// start() method: Ask the Sasl ibrary, "How do we -// ask the hdfs server for service? -std::pair<Status, std::string> -CySaslEngine::Start() -{ - int rc; - Status status; - - if (state_ != kUnstarted) - LOG_WARN(kRPC, << "CySaslEngine::start() when state is " << state_); - - status = InitCyrusSasl(); - - if ( !status.ok()) { - state_ = kErrorState; - return std::make_pair( status, ""); - } - - sasl_interact_t * client_interact = NULL; - char * buf; - unsigned int buflen; - const char * chosen_mech; - std::string token; - - try { - LockGuard saslGuard(getSaslMutex()); - rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact, - (const char **) &buf, &buflen, &chosen_mech); - } catch (const LockFailure& e) { - state_ = kFailure; - return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); - } - - - switch (rc) { - case SASL_OK: state_ = kSuccess; - break; - case SASL_CONTINUE: state_ = kWaitingForData; - break; - default: state_ = kFailure; - return std::make_pair( SaslError(rc), ""); - break; - } // switch( rc) - - // Cyrus will free this buffer when the connection is shut down - token = std::string( buf, buflen); - return std::make_pair( Status::OK(), token); - -} // start() method - -std::pair<Status, std::string> CySaslEngine::Step(const std::string data) -{ - char * output = NULL; - unsigned int outlen = 0; - sasl_interact_t * client_interact = NULL; - - if (state_ != kWaitingForData) - LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_); - - int rc = 0; - try { - LockGuard saslGuard(getSaslMutex()); - rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact, - (const char **) &output, &outlen); - } catch (const LockFailure& e) { - state_ = kFailure; - return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); - } - // right now, state_ == kWaitingForData, - // so update state_, to reflect _step()'s result: - switch (rc) { - case SASL_OK: state_ = kSuccess; break; - case SASL_CONTINUE: state_ = kWaitingForData; break; - default: state_ = kFailure; - return std::make_pair(SaslError(rc), ""); - break; - } // switch( rc) - return std::make_pair(Status::OK(), std::string( output,outlen)); -} // step() method - -Status CySaslEngine::Finish() -{ - if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) - LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_); - - if (conn_ != nullptr) { - try { - LockGuard saslGuard(getSaslMutex()); - sasl_dispose( &conn_); - conn_ = NULL; - } catch (const LockFailure& e) { - return Status::MutexError("mutex that guards sasl_dispose unable to lock"); - } - } - - return Status::OK(); -} - -////////////////////////////////////////////////// -// Internal callbacks, for sasl_init_client(). // -// Mostly lifted from cyrus' sample_client.c . // -// Implicitly called in a context that already // -// holds the SASL/GSSAPI lock. // -////////////////////////////////////////////////// - -static int -sasl_my_log(void *context __attribute__((unused)), - int priority, - const char *message) -{ - if (! message) - return SASL_BADPARAM; - - //TODO: get client, connection ID in here - switch (priority) { - case SASL_LOG_NONE: return SASL_OK; // no-op - case SASL_LOG_ERR: // fall through to FAIL - case SASL_LOG_FAIL: - LOG_ERROR(kRPC, << "SASL Error: " << message); - break; - case SASL_LOG_WARN: - LOG_ERROR(kRPC, << message); - break; - case SASL_LOG_NOTE: - LOG_INFO(kRPC, << message); - break; - case SASL_LOG_DEBUG: - LOG_DEBUG(kRPC, << message); - break; - case SASL_LOG_TRACE: - LOG_TRACE(kRPC, << message); - break; - case SASL_LOG_PASS: return SASL_OK; // don't log password-info - default: - LOG_WARN(kRPC, << "Unknown SASL log level(" << priority << "): " << message); - break; - } - - return SASL_OK; -} // sasl_my_log() callback - -static int -sasl_getopt(void *context, const char *plugin_name, - const char *option, - const char **result, unsigned *len) -{ - if (plugin_name) { - LOG_WARN(kRPC, << "CySaslEngine: Unexpected plugin_name " << plugin_name); - return SASL_OK; - } // 123456789012345678 - if (! strncmp( option, "canon_user_plugin", 18)) { - // TODO: maybe write a canon_user_plugin to do user-to-principal mapping - *result = "INTERNAL"; - if (len) *len = strlen( *result); - return SASL_OK; - } // 12345678901234567 - if (! strncmp( option, "client_mech_list", 17)) { - *result = "GSSAPI"; - if (len) *len = strlen( *result); - return SASL_OK; - } - - (void) context; // unused - return SASL_OK; } - -#define PLUGINDIR "/usr/local/lib/sasl2" // where the mechanisms are - -static int -get_path(void *context, const char ** path) -{ - const char *searchpath = (const char *) context; - - if (! path) - return SASL_BADPARAM; - - // TODO: check the SASL_PATH environment, or will Cyrus pass that in in the context? - if (searchpath) { - *path = searchpath; - } else { - *path = PLUGINDIR; - } - - return SASL_OK; -} // getpath() callback - -int get_name(void *context, - int id, - const char **result, - unsigned *len) -{ - const CySaslEngine * pThis = (const CySaslEngine *) context; - - if (!result) - return SASL_BADPARAM; - - switch (id) { - case SASL_CB_AUTHNAME: - if (!pThis->id_) - break; - if (len) - *len = pThis->id_->size(); - *result = pThis->id_->c_str(); - break; - case SASL_CB_USER: - if (!pThis->principal_) - break; - if (len) - *len = pThis->principal_->size(); - *result = pThis->principal_->c_str(); - break; - case SASL_CB_LANGUAGE: - *result = NULL; - if (len) - *len = 0; - break; - default: - return SASL_BADPARAM; - } - - LOG_DEBUG(kRPC, << "Cyrus::get_name: returning " << *result); - - return SASL_OK; -} // simple() callback - -int getrealm(void *context, - int id, - const char **availrealms, - const char **result) -{ - (void)availrealms; // unused - const CySaslEngine * pThis = (const CySaslEngine *) context; - - if (!result) - return SASL_BADPARAM; - - if (id != SASL_CB_GETREALM) return SASL_FAIL; - if (pThis->realm_) - *result = pThis->realm_->c_str(); - - return SASL_OK; -} // getrealm() callback - - -/***************************************************************************** -* CYRUS PER-PROCESS INITIALIZATION -*/ - - -const sasl_callback_t per_process_callbacks[] = { - { SASL_CB_LOG, (sasl_callback_ft) & sasl_my_log, NULL}, - { SASL_CB_GETOPT, (sasl_callback_ft) & sasl_getopt, NULL}, - { SASL_CB_GETPATH, (sasl_callback_ft) & get_path, NULL}, // to find th mechanisms - { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} - }; // callbacks_ array - -CyrusPerProcessData::CyrusPerProcessData() -{ - try { - LockGuard saslGuard(getSaslMutex()); - int init_rc = sasl_client_init(per_process_callbacks); - init_status_ = make_status(init_rc); - } catch (const LockFailure& e) { - init_status_ = Status::MutexError("mutex protecting process-wide sasl_client_init unable to lock"); - } -} - -CyrusPerProcessData::~CyrusPerProcessData() -{ - // Undo sasl_client_init()) - try { - LockGuard saslGuard(getSaslMutex()); - sasl_done(); - } catch (const LockFailure& e) { - // Not can be done at this point, but the process is most likely shutting down anyway. - LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to lock"); - } - -} - -Status CyrusPerProcessData::Init() -{ - return GetInstance().init_status_; -} - -CyrusPerProcessData & CyrusPerProcessData::GetInstance() -{ - // Meyer's singleton, thread safe and lazily initialized in C++11 - // - // Must be lazily initialized to allow client code to plug in a GSSAPI mutex - // implementation. - static CyrusPerProcessData per_process_data; - return per_process_data; -} - - -} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h deleted file mode 100644 index 7c0f4e1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_RPC_CYRUS_SASLENGINE_H -#define LIB_RPC_CYRUS_SASLENGINE_H - -#include "sasl/sasl.h" -#include "sasl_engine.h" - -namespace hdfs -{ - -class CySaslEngine : public SaslEngine -{ -public: - CySaslEngine(); - virtual ~CySaslEngine(); - - virtual std::pair<Status, std::string> Start(); - virtual std::pair<Status, std::string> Step(const std::string data); - virtual Status Finish(); -private: - Status InitCyrusSasl(); - Status SaslError(int rc); - - friend int get_name(void *, int, const char **, unsigned *); - friend int getrealm(void *, int, const char **availrealms, const char **); - - sasl_conn_t * conn_; - std::vector<sasl_callback_t> per_connection_callbacks_; -}; //class CySaslEngine - -} // namespace hdfs - -#endif /* LIB_RPC_CYRUS_SASLENGINE_H */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc deleted file mode 100644 index 7705c81..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfspp/locks.h" - -#include <sstream> -#include <gsasl.h> -#include "sasl_engine.h" -#include "gsasl_engine.h" -#include "common/logging.h" - - -namespace hdfs { - - -/***************************************************************************** - * GSASL UTILITY FUNCTIONS - */ - -static Mutex *getSaslMutex() { - return LockManager::getGssapiMutex(); -} - -static Status rc_to_status(int rc) -{ - if (rc == GSASL_OK) { - return Status::OK(); - } else { - std::ostringstream ss; - ss << "Cannot initialize client (" << rc << "): " << gsasl_strerror(rc); - return Status::Error(ss.str().c_str()); - } -} - -static -std::pair<Status, std::string> base64_encode(const std::string & in) { - char * temp; - size_t len; - std::string retval; - (void)base64_encode; - - int rc = gsasl_base64_to(in.c_str(), in.size(), &temp, &len); - - if (rc != GSASL_OK) { - return std::make_pair(rc_to_status(rc), ""); - } - - if (temp) { - retval = temp; - free(temp); - } - - if (!temp || retval.length() != len) { - return std::make_pair(Status::Error("SaslEngine: Failed to encode string to base64"), ""); - } - - return std::make_pair(Status::OK(), retval); -} - -/***************************************************************************** - * GSASL ENGINE - */ - -GSaslEngine::~GSaslEngine() -{ - // These should already be called in this->Finish - try { - LockGuard saslGuard(getSaslMutex()); - if (session_ != nullptr) { - gsasl_finish(session_); - } - - if (ctx_ != nullptr) { - gsasl_done(ctx_); - } - } catch (const LockFailure& e) { - if(session_ || ctx_) { - LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to dispose of gsasl state: " << e.what()); - } - } -} - -Status GSaslEngine::gsasl_new() { - int status = GSASL_OK; - - if (ctx_) return Status::OK(); - - try { - LockGuard saslGuard(getSaslMutex()); - status = gsasl_init( & ctx_); - } catch (const LockFailure& e) { - return Status::MutexError("Mutex that guards gsasl_init unable to lock"); - } - - switch ( status) { - case GSASL_OK: - return Status::OK(); - case GSASL_MALLOC_ERROR: - LOG_WARN(kRPC, << "GSaslEngine: Out of memory."); - return Status::Error("SaslEngine: Out of memory."); - default: - LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status); - return Status::Error("SaslEngine: Unexpected error."); - } -} // gsasl_new() - -std::pair<Status, std::string> -GSaslEngine::Start() -{ - int rc; - Status status; - - this->gsasl_new(); - - /* Create new authentication session. */ - try { - LockGuard saslGuard(getSaslMutex()); - rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_); - } catch (const LockFailure& e) { - state_ = kErrorState; - return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); - } - if (rc != GSASL_OK) { - state_ = kErrorState; - return std::make_pair( rc_to_status( rc), std::string("")); - } - Status init_status = init_kerberos(); - if(!init_status.ok()) { - state_ = kErrorState; - return std::make_pair(init_status, ""); - } - - state_ = kWaitingForData; - - // get from the sasl library the initial token - // that we'll send to the application server: - return this->Step( chosen_mech_.challenge.c_str()); -} // start() method - -Status GSaslEngine::init_kerberos() { - - //TODO: check that we have a principal - try { - LockGuard saslGuard(getSaslMutex()); - // these don't return anything that indicates failure - gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str()); - gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str()); - gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str()); - } catch (const LockFailure& e) { - return Status::MutexError("Mutex that guards gsasl_property_set in GSaslEngine::init_kerberos unable to lock"); - } - return Status::OK(); -} - -std::pair<Status, std::string> GSaslEngine::Step(const std::string data) { - if (state_ != kWaitingForData) - LOG_WARN(kRPC, << "GSaslEngine::step when state is " << state_); - - char * output = NULL; - size_t outputSize; - - int rc = 0; - try { - LockGuard saslGuard(getSaslMutex()); - rc = gsasl_step(session_, data.c_str(), data.size(), &output, - &outputSize); - } catch (const LockFailure& e) { - state_ = kFailure; - return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); - } - - if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) { - std::string retval(output, output ? outputSize : 0); - if (output) { - free(output); - } - - if (rc == GSASL_OK) { - state_ = kSuccess; - } - - return std::make_pair(Status::OK(), retval); - } - else { - if (output) { - free(output); - } - state_ = kFailure; - return std::make_pair(rc_to_status(rc), ""); - } -} - -Status GSaslEngine::Finish() -{ - if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) - LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_); - - try { - LockGuard saslGuard(getSaslMutex()); - if (session_ != nullptr) { - gsasl_finish(session_); - session_ = NULL; - } - - if (ctx_ != nullptr) { - gsasl_done(ctx_); - ctx_ = nullptr; - } - } catch (const LockFailure& e) { - return Status::MutexError("Mutex that guards sasl state cleanup in GSaslEngine::Finish unable to lock"); - } - return Status::OK(); -} // finish() method - -} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h deleted file mode 100644 index 331b3fd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_RPC_GSASLENGINE_H -#define LIB_RPC_GSASLENGINE_H - -#include <gsasl.h> - -#include "sasl_engine.h" - -namespace hdfs { - -class GSaslEngine : public SaslEngine -{ -public: - GSaslEngine() : SaslEngine(), ctx_(nullptr), session_(nullptr) {} - virtual ~GSaslEngine(); - - virtual std::pair<Status,std::string> Start(); - virtual std::pair<Status,std::string> Step(const std::string data); - virtual Status Finish(); -private: - Status gsasl_new(); - Gsasl * ctx_; - Gsasl_session * session_; - - Status init_kerberos(); -}; - -} // namespace hdfs - -#endif /* LIB_RPC_GSASLENGINE_H */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc deleted file mode 100644 index e83a28c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "namenode_tracker.h" - -#include "common/logging.h" -#include "common/libhdfs_events_impl.h" -#include "common/util.h" - -namespace hdfs { - -static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) { - std::stringstream ss; - for(unsigned int i=0; i<pts.size(); i++) - if(i == pts.size() - 1) - ss << pts[i]; - else - ss << pts[i] << ", "; - return ss.str(); -} - -HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, - std::shared_ptr<LibhdfsEvents> event_handlers) - : enabled_(false), resolved_(false), - ioservice_(ioservice), event_handlers_(event_handlers) -{ - LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes"); - for(unsigned int i=0;i<servers.size();i++) - LOG_TRACE(kRPC, << servers[i].str()); - - if(servers.size() >= 2) { - LOG_TRACE(kRPC, << "Creating HA namenode tracker"); - if(servers.size() > 2) { - LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used."); - } - - active_info_ = servers[0]; - standby_info_ = servers[1]; - LOG_INFO(kRPC, << "HA enabled. Using the following namenodes from the configuration." - << "\nNote: Active namenode cannot be determined until a connection has been made.") - LOG_INFO(kRPC, << "First namenode url = " << active_info_.uri.str()); - LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str()); - - enabled_ = true; - if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { - resolved_ = true; - } - } -} - -HANamenodeTracker::~HANamenodeTracker() {} - -bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, - ResolvedNamenodeInfo& out) -{ - mutex_guard swap_lock(swap_lock_); - - // Cannot look up without a key. - if(current_endpoints.size() == 0) { - event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(), - 0 /*Not much to say about context without endpoints*/); - LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint."); - return false; - } - - LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]); - - if(IsCurrentActive_locked(current_endpoints[0])) { - std::swap(active_info_, standby_info_); - if(event_handlers_) - event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), - reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - out = active_info_; - } else if(IsCurrentStandby_locked(current_endpoints[0])) { - // Connected to standby - if(event_handlers_) - event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), - reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - out = active_info_; - } else { - // Invalid state (or a NIC was added that didn't show up during DNS) - std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream - errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n" - << format_endpoints(active_info_.endpoints) << " and\n" - << format_endpoints(standby_info_.endpoints) << std::endl; - LOG_ERROR(kRPC, << errorMsg.str()); - return false; - } - - // Extra DNS on swapped node to try and get EPs if it didn't already have them - if(out.endpoints.empty()) { - LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again"); - if(!ResolveInPlace(ioservice_, out)) { - // Stuck retrying against the same NN that was able to be resolved in this case - LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str() - << " failed. Please make sure your configuration is up to date."); - } - } - - return true; -} - - -bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { - for(unsigned int i=0;i<active_info_.endpoints.size();i++) { - if(ep.address() == active_info_.endpoints[i].address()) { - if(ep.port() != active_info_.endpoints[i].port()) - LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway.."); - return true; - } - } - return false; -} - -bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const { - for(unsigned int i=0;i<standby_info_.endpoints.size();i++) { - if(ep.address() == standby_info_.endpoints[i].address()) { - if(ep.port() != standby_info_.endpoints[i].port()) - LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway.."); - return true; - } - } - return false; -} - -} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h deleted file mode 100644 index cc34f51..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIB_RPC_NAMENODE_TRACKER_H -#define LIB_RPC_NAMENODE_TRACKER_H - -#include "common/libhdfs_events_impl.h" -#include "common/namenode_info.h" - -#include <asio/ip/tcp.hpp> - -#include <memory> -#include <mutex> -#include <vector> - -namespace hdfs { - -/* - * Tracker gives the RpcEngine a quick way to use an endpoint that just - * failed in order to lookup a set of endpoints for a failover node. - * - * Note: For now this only deals with 2 NameNodes, but that's the default - * anyway. - */ -class HANamenodeTracker { - public: - HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, - std::shared_ptr<LibhdfsEvents> event_handlers_); - - virtual ~HANamenodeTracker(); - - bool is_enabled() const { return enabled_; } - bool is_resolved() const { return resolved_; } - - // Pass in vector of endpoints held by RpcConnection, use endpoints to infer node - // currently being used. Swap internal state and set out to other node. - // Note: This will always mutate internal state. Use IsCurrentActive/Standby to - // get info without changing state - bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, - ResolvedNamenodeInfo& out); - - private: - // See if endpoint ep is part of the list of endpoints for the active or standby NN - bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; - bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; - - // If HA should be enabled, according to our options and runtime info like # nodes provided - bool enabled_; - // If we were able to resolve at least 1 HA namenode - bool resolved_; - - // Keep service in case a second round of DNS lookup is required - ::asio::io_service *ioservice_; - - // Event handlers, for now this is the simplest place to catch all failover events - // and push info out to client application. Possibly move into RPCEngine. - std::shared_ptr<LibhdfsEvents> event_handlers_; - - // Only support 1 active and 1 standby for now. - ResolvedNamenodeInfo active_info_; - ResolvedNamenodeInfo standby_info_; - - // Aquire when switching from active-standby - std::mutex swap_lock_; -}; - -} // end namespace hdfs -#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc deleted file mode 100644 index 356411e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include "request.h" -#include "rpc_engine.h" -#include "sasl_protocol.h" - -#include "RpcHeader.pb.h" -#include "ProtobufRpcEngine.pb.h" -#include "IpcConnectionContext.pb.h" - -#include <sstream> - -namespace hdfs { - -namespace pb = ::google::protobuf; -namespace pbio = ::google::protobuf::io; - -using namespace ::hadoop::common; -using namespace ::std::placeholders; - -static const int kNoRetry = -1; - -// Protobuf helper functions. -// Note/todo: Using the zero-copy protobuf API here makes the simple procedures -// below tricky to read and debug while providing minimal benefit. Reducing -// allocations in BlockReader (HDFS-11266) and smarter use of std::stringstream -// will have a much larger impact according to cachegrind profiles on common -// workloads. -static void AddHeadersToPacket(std::string *res, - std::initializer_list<const pb::MessageLite *> headers, - const std::string *payload) { - int len = 0; - std::for_each( - headers.begin(), headers.end(), - [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); - - if (payload) { - len += payload->size(); - } - - int net_len = htonl(len); - res->reserve(res->size() + sizeof(net_len) + len); - - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); - - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - - std::for_each( - headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { - buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); - buf = v->SerializeWithCachedSizesToArray(buf); - }); - - if (payload) { - buf = os.WriteStringToArray(*payload, buf); - } -} - -static void ConstructPayload(std::string *res, const pb::MessageLite *header) { - int len = DelimitedPBMessageSize(header); - res->reserve(len); - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - assert(buf); - buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf); - buf = header->SerializeWithCachedSizesToArray(buf); -} - -static void SetRequestHeader(std::weak_ptr<LockFreeRpcEngine> weak_engine, int call_id, - const std::string &method_name, int retry_count, - RpcRequestHeaderProto *rpc_header, - RequestHeaderProto *req_header) -{ - // Ensure the RpcEngine is live. If it's not then the FileSystem is being destructed. - std::shared_ptr<LockFreeRpcEngine> counted_engine = weak_engine.lock(); - if(!counted_engine) { - LOG_ERROR(kRPC, << "SetRequestHeader attempted to access an invalid RpcEngine"); - return; - } - - rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); - rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - rpc_header->set_callid(call_id); - if (retry_count != kNoRetry) { - rpc_header->set_retrycount(retry_count); - } - rpc_header->set_clientid(counted_engine->client_id()); - req_header->set_methodname(method_name); - req_header->set_declaringclassprotocolname(counted_engine->protocol_name()); - req_header->set_clientprotocolversion(counted_engine->protocol_version()); -} - -// Request implementation - -Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &method_name, int call_id, - const pb::MessageLite *request, Handler &&handler) - : engine_(engine), - method_name_(method_name), - call_id_(call_id), - timer_(engine->io_service()), - handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry), - failover_count_(0) -{ - ConstructPayload(&payload_, request); -} - -Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler) - : engine_(engine), - call_id_(-1/*Handshake ID*/), - timer_(engine->io_service()), - handler_(std::move(handler)), - retry_count_(engine->retry_policy() ? 0 : kNoRetry), - failover_count_(0) { -} - -void Request::GetPacket(std::string *res) const { - LOG_TRACE(kRPC, << "Request::GetPacket called"); - - if (payload_.empty()) - return; - - RpcRequestHeaderProto rpc_header; - RequestHeaderProto req_header; - SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header, - &req_header); - - // SASL messages don't have a request header - if (method_name_ != SASL_METHOD_NAME) - AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_); - else - AddHeadersToPacket(res, {&rpc_header}, &payload_); -} - -void Request::OnResponseArrived(pbio::CodedInputStream *is, - const Status &status) { - LOG_TRACE(kRPC, << "Request::OnResponseArrived called"); - handler_(is, status); -} - -std::string Request::GetDebugString() const { - // Basic description of this object, aimed at debugging - std::stringstream ss; - ss << "\nRequest Object:\n"; - ss << "\tMethod name = \"" << method_name_ << "\"\n"; - ss << "\tCall id = " << call_id_ << "\n"; - ss << "\tRetry Count = " << retry_count_ << "\n"; - ss << "\tFailover count = " << failover_count_ << "\n"; - return ss.str(); -} - -int Request::IncrementFailoverCount() { - // reset retry count when failing over - retry_count_ = 0; - return failover_count_++; -} - -} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h deleted file mode 100644 index f195540..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_REQUEST_H -#define LIB_RPC_RPC_REQUEST_H - -#include "hdfspp/status.h" -#include "common/util.h" -#include "common/new_delete.h" - -#include <string> -#include <memory> - -#include <google/protobuf/message_lite.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> - -#include <asio/deadline_timer.hpp> - - -namespace hdfs { - -class LockFreeRpcEngine; -class SaslProtocol; - -/* - * Internal bookkeeping for an outstanding request from the consumer. - * - * Threading model: not thread-safe; should only be accessed from a single - * thread at a time - */ -class Request { - public: - MEMCHECKED_CLASS(Request) - typedef std::function<void(::google::protobuf::io::CodedInputStream *is, - const Status &status)> Handler; - - // Constructors will not make any blocking calls while holding the shared_ptr<RpcEngine> - Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &method_name, int call_id, - const ::google::protobuf::MessageLite *request, Handler &&callback); - - // Null request (with no actual message) used to track the state of an - // initial Connect call - Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler); - - int call_id() const { return call_id_; } - std::string method_name() const { return method_name_; } - ::asio::deadline_timer &timer() { return timer_; } - int IncrementRetryCount() { return retry_count_++; } - int IncrementFailoverCount(); - void GetPacket(std::string *res) const; - void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, - const Status &status); - - int get_failover_count() {return failover_count_;} - - std::string GetDebugString() const; - - private: - std::weak_ptr<LockFreeRpcEngine> engine_; - const std::string method_name_; - const int call_id_; - - ::asio::deadline_timer timer_; - std::string payload_; - const Handler handler_; - - int retry_count_; - int failover_count_; -}; - -} // end namespace hdfs -#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a542fb3/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h deleted file mode 100644 index 9e54983..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_CONNECTION_H -#define LIB_RPC_RPC_CONNECTION_H - -/* - * Encapsulates a persistent connection to the NameNode, and the sending of - * RPC requests and evaluating their responses. - * - * Can have multiple RPC requests in-flight simultaneously, but they are - * evaluated in-order on the server side in a blocking manner. - * - * Threading model: public interface is thread-safe - * All handlers passed in to method calls will be called from an asio thread, - * and will not be holding any internal RpcConnection locks. - */ - -#include "request.h" -#include "common/auth_info.h" -#include "common/libhdfs_events_impl.h" -#include "common/new_delete.h" -#include "hdfspp/status.h" - -#include <functional> -#include <memory> -#include <vector> -#include <deque> -#include <unordered_map> - -namespace hdfs { - -typedef const std::function<void(const Status &)> RpcCallback; - -class LockFreeRpcEngine; -class SaslProtocol; - -class RpcConnection : public std::enable_shared_from_this<RpcConnection> { - public: - MEMCHECKED_CLASS(RpcConnection) - RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine); - virtual ~RpcConnection(); - - // Note that a single server can have multiple endpoints - especially both - // an ipv4 and ipv6 endpoint - virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, - const AuthInfo & auth_info, - RpcCallback &handler) = 0; - virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; - virtual void Disconnect() = 0; - - void StartReading(); - void AsyncRpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler); - - void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests); - - // Enqueue requests before the connection is connected. Will be flushed - // on connect - void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests); - - // Put requests at the front of the current request queue - void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests); - - void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers); - void SetClusterName(std::string cluster_name); - void SetAuthInfo(const AuthInfo& auth_info); - - std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; } - ::asio::io_service *GetIoService(); - - protected: - struct Response { - enum ResponseState { - kReadLength, - kReadContent, - kParseResponse, - } state_; - unsigned length_; - std::vector<char> data_; - - std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; - std::unique_ptr<::google::protobuf::io::CodedInputStream> in; - - Response() : state_(kReadLength), length_(0) {} - }; - - - // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected - virtual void SendHandshake(RpcCallback &handler) = 0; - void HandshakeComplete(const Status &s); - void AuthComplete(const Status &s, const AuthInfo & new_auth_info); - void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); - virtual void SendContext(RpcCallback &handler) = 0; - void ContextComplete(const Status &s); - - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - virtual void FlushPendingRequests()=0; // Synchronously write the next request - - void AsyncRpc_locked( - const std::string &method_name, - const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler); - void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests); - void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time - - - - std::shared_ptr<std::string> PrepareHandshakePacket(); - std::shared_ptr<std::string> PrepareContextPacket(); - static std::string SerializeRpcRequest(const std::string &method_name, - const ::google::protobuf::MessageLite *req); - - Status HandleRpcResponse(std::shared_ptr<Response> response); - void HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec); - void CommsError(const Status &status); - - void ClearAndDisconnect(const ::asio::error_code &ec); - std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); - - std::weak_ptr<LockFreeRpcEngine> engine_; - std::shared_ptr<Response> current_response_state_; - AuthInfo auth_info_; - - // Connection can have deferred connection, especially when we're pausing - // during retry - enum ConnectedState { - kNotYetConnected, - kConnecting, - kHandshaking, - kAuthenticating, - kConnected, - kDisconnected - }; - static std::string ToString(ConnectedState connected); - ConnectedState connected_; - - // State machine for performing a SASL handshake - std::shared_ptr<SaslProtocol> sasl_protocol_; - // The request being sent over the wire; will also be in sent_requests_ - std::shared_ptr<Request> outgoing_request_; - // Requests to be sent over the wire - std::deque<std::shared_ptr<Request>> pending_requests_; - // Requests to be sent over the wire during authentication; not retried if - // there is a connection error - std::deque<std::shared_ptr<Request>> auth_requests_; - // Requests that are waiting for responses - typedef std::unordered_map<int, std::shared_ptr<Request>> SentRequestMap; - SentRequestMap sent_requests_; - - std::shared_ptr<LibhdfsEvents> event_handlers_; - std::string cluster_name_; - - // Lock for mutable parts of this class that need to be thread safe - std::mutex connection_state_lock_; - - friend class SaslProtocol; -}; - -} // end namespace hdfs -#endif // end include Guard --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org