http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc new file mode 100644 index 0000000..c3cbf7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datatransfer.h" + +#include "hdfspp/status.h" + +namespace hdfs { + +namespace DataTransferSaslStreamUtil { + +static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS; + +using hadoop::hdfs::DataTransferEncryptorMessageProto; + +Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) { + using namespace hadoop::hdfs; + auto s = msg->status(); + if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) { + payload->clear(); + return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str()); + } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) { + payload->clear(); + return Status::Error(msg->message().c_str()); + } else { + *payload = msg->payload(); + return Status::OK(); + } +} + +void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) { + msg->set_status(kSUCCESS); + msg->set_payload(""); +} + +} +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h new file mode 100644 index 0000000..93103c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_DATA_TRANSFER_H_ +#define LIB_READER_DATA_TRANSFER_H_ + +#include "common/sasl_authenticator.h" +#include "common/async_stream.h" +#include "connection/datanodeconnection.h" +#include <memory> + + +namespace hdfs { + +enum { + kDataTransferVersion = 28, + kDataTransferSasl = 0xdeadbeef, +}; + +enum Operation { + kWriteBlock = 80, + kReadBlock = 81, +}; + +template <class Stream> class DataTransferSaslStream : public DataNodeConnection { +public: + DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username, + const std::string &password) + : stream_(stream), authenticator_(username, password) {} + + template <class Handler> void Handshake(const Handler &next); + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + stream_->async_read_some(buf, handler); + } + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + stream_->async_write_some(buf, handler); + } + + void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override + {(void)handler; /*TODO: Handshaking goes here*/}; + + void Cancel(); +private: + DataTransferSaslStream(const DataTransferSaslStream &) = delete; + DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; + std::shared_ptr<Stream> stream_; + DigestMD5Authenticator authenticator_; + struct ReadSaslMessage; + struct Authenticator; +}; +} + +#include "datatransfer_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h new file mode 100644 index 0000000..77e618d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_DATATRANFER_IMPL_H_ +#define LIB_READER_DATATRANFER_IMPL_H_ + +#include "datatransfer.pb.h" +#include "common/continuation/continuation.h" +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include <asio/read.hpp> +#include <asio/buffer.hpp> + +namespace hdfs { + +namespace DataTransferSaslStreamUtil { +Status +ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg, + std::string *payload); +void PrepareInitialHandshake( + ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg); +} + +template <class Stream> +struct DataTransferSaslStream<Stream>::Authenticator + : continuation::Continuation { + Authenticator(DigestMD5Authenticator *authenticator, + const std::string *request, + hadoop::hdfs::DataTransferEncryptorMessageProto *msg) + : authenticator_(authenticator), request_(request), msg_(msg) {} + + virtual void Run(const Next &next) override { + using namespace ::hadoop::hdfs; + std::string response; + Status status = authenticator_->EvaluateResponse(*request_, &response); + msg_->Clear(); + if (status.ok()) { + // TODO: Handle encryption scheme + msg_->set_payload(response); + msg_->set_status( + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + } else { + msg_->set_status( + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR); + } + next(Status::OK()); + } + +private: + DigestMD5Authenticator *authenticator_; + const std::string *request_; + hadoop::hdfs::DataTransferEncryptorMessageProto *msg_; +}; + +template <class Stream> +struct DataTransferSaslStream<Stream>::ReadSaslMessage + : continuation::Continuation { + ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data) + : stream_(stream), data_(data), read_pb_(stream, &resp_) {} + + virtual void Run(const Next &next) override { + auto handler = [this, next](const Status &status) { + if (status.ok()) { + Status new_stat = + DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_); + next(new_stat); + } else { + next(status); + } + }; + read_pb_.Run(handler); + } + +private: + std::shared_ptr<Stream> stream_; + std::string *data_; + hadoop::hdfs::DataTransferEncryptorMessageProto resp_; + continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_; +}; + +template <class Stream> +template <class Handler> +void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { + using ::hadoop::hdfs::DataTransferEncryptorMessageProto; + using ::hdfs::asio_continuation::Write; + using ::hdfs::continuation::WriteDelimitedPBMessage; + + static const int kMagicNumber = htonl(kDataTransferSasl); + static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer( + reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber)); + + struct State { + DataTransferEncryptorMessageProto req0; + std::string resp0; + DataTransferEncryptorMessageProto req1; + std::string resp1; + std::shared_ptr<Stream> stream; + }; + auto m = continuation::Pipeline<State>::Create(); + State *s = &m->state(); + s->stream = stream_; + + DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); + + m->Push(Write(stream_, kMagicNumberBuffer)) + .Push(WriteDelimitedPBMessage(stream_, &s->req0)) + .Push(new ReadSaslMessage(stream_, &s->resp0)) + .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) + .Push(WriteDelimitedPBMessage(stream_, &s->req1)) + .Push(new ReadSaslMessage(stream_, &s->resp1)); + m->Run([next](const Status &status, const State &) { next(status); }); +} + +template <class Stream> +void DataTransferSaslStream<Stream>::Cancel() { + /* implement with secured reads */ +} + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h new file mode 100644 index 0000000..2fb42a6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_FILEINFO_H_ +#define LIB_READER_FILEINFO_H_ + +#include "ClientNamenodeProtocol.pb.h" + +namespace hdfs { + +/** + * Information that is assumed to be unchanging about a file for the duration of + * the operations. + */ +struct FileInfo { + unsigned long long file_length_; + bool under_construction_; + bool last_block_complete_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc new file mode 100644 index 0000000..a64800a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "readergroup.h" + +#include <algorithm> + +namespace hdfs { + +void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + ClearDeadReaders(); + std::weak_ptr<BlockReader> weak_ref = reader; + readers_.push_back(weak_ref); +} + +std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + + std::vector<std::shared_ptr<BlockReader>> live_readers; + for(auto it=readers_.begin(); it != readers_.end(); it++) { + std::shared_ptr<BlockReader> live_reader = it->lock(); + if(live_reader) { + live_readers.push_back(live_reader); + } + } + return live_readers; +} + +void ReaderGroup::ClearDeadReaders() { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + + auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) { + return ptr.expired(); + }; + + auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead); + readers_.erase(it, readers_.end()); +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h new file mode 100644 index 0000000..e6173f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef READER_READER_GROUP_H_ +#define READER_READER_GROUP_H_ + +#include "block_reader.h" + +#include <memory> +#include <vector> +#include <mutex> + +namespace hdfs { + +/** + * Provide a way of logically grouping ephemeral block readers + * so that their status can be monitored or changed. + * + * Note: This does not attempt to extend the reader life + * cycle. Readers are assumed to be owned by something else + * using a shared_ptr. + **/ + +class ReaderGroup { + public: + ReaderGroup() {}; + void AddReader(std::shared_ptr<BlockReader> reader); + /* find live readers, promote to shared_ptr */ + std::vector<std::shared_ptr<BlockReader>> GetLiveReaders(); + private: + /* remove weak_ptrs that don't point to live object */ + void ClearDeadReaders(); + std::recursive_mutex state_lock_; + std::vector<std::weak_ptr<BlockReader>> readers_; +}; + +} // end namespace hdfs +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt new file mode 100644 index 0000000..e5a26fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc) +if (CMAKE_USING_CYRUS_SASL) + list(APPEND rpc_object_items cyrus_sasl_engine.cc) +endif (CMAKE_USING_CYRUS_SASL) +if (CMAKE_USING_GSASL) + list(APPEND rpc_object_items gsasl_engine.cc) +endif (CMAKE_USING_GSASL) + +add_library(rpc_obj OBJECT ${rpc_object_items}) + + +add_dependencies(rpc_obj proto) +add_library(rpc $<TARGET_OBJECTS:rpc_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc new file mode 100644 index 0000000..5c96ede --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/locks.h" + +#include <sys/types.h> +#include "sasl/sasl.h" +#include "sasl/saslutil.h" +#include <string.h> +#include <string> +#include <sstream> +#include <unistd.h> // getpass() ( deprecated) + +#include "common/logging.h" + +#include "sasl_engine.h" +#include "cyrus_sasl_engine.h" + +namespace hdfs { + +static Mutex *getSaslMutex() { + return LockManager::getGssapiMutex(); +} + +// Forward decls of sasl callback functions +typedef int (*sasl_callback_ft)(void); +int get_name(void *context, + int id, + const char **result, + unsigned *len); + +int getrealm(void *context, + int id, + const char **availrealms, + const char **result); + +// This should be constructed once per process, and destroyed once per process + +class CyrusPerProcessData +{ +public: + static Status Init(); // Can be called many times +private: + CyrusPerProcessData(); + ~CyrusPerProcessData(); + Status init_status_; + + static CyrusPerProcessData & GetInstance(); +}; + + +/***************************************************************************** + * CYRUS UTILITY FUNCTIONS + */ + +// Cyrus-specific error messages: +// errStr() is the non-method version, to +// be called by utility routines. +std::string errStr( int rc) { + switch (rc) { + case SASL_NOTINIT: /* -12 */ return "SASL library not initialized"; + case SASL_WRONGMECH:/* -11 */ return "mechanism doesn't support requested feature"; + case SASL_BADSERV: /* -10 */ return "server failed mutual authentication step"; + case SASL_BADMAC: /* -9 */ return "integrity check failed"; + case SASL_TRYAGAIN: /* -8 */ return "transient failure (e.g., weak key)"; + case SASL_BADPARAM: /* -7 */ return "invalid parameter supplied"; + case SASL_NOTDONE: /* -6 */ return "can't request info until later in exchange"; + case SASL_BADPROT: /* -5 */ return "bad protocol / cancel"; + case SASL_NOMECH: /* -4 */ return "mechanism not supported"; + case SASL_BUFOVER: /* -3 */ return "overflowed buffer"; + case SASL_NOMEM: /* -2 */ return "memory shortage failure"; + case SASL_FAIL: /* -1 */ return "generic failure"; + case SASL_OK: /* 0 */ return "successful result"; + case SASL_CONTINUE: /* 1 */ return "another step is needed in authentication"; + case SASL_INTERACT: /* 2 */ return "needs user interaction"; + default: return "unknown error"; + } // switch(rc) +} // errStr() + +Status make_status(int rc) { + if (rc != SASL_OK && + rc != SASL_CONTINUE && + rc != SASL_INTERACT) { + return Status::AuthenticationFailed(errStr(rc).c_str()); + } + return Status::OK(); +} + +// SaslError() method: Use this when a method needs +// to update the engine's state. +Status CySaslEngine::SaslError( int rc) { + Status status = make_status(rc); + if (!status.ok()) + state_ = kErrorState; + + return status; +} + + +/***************************************************************************** +* Cyrus SASL ENGINE +*/ + +CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr) +{ + // Create an array of callbacks that embed a pointer to this + // so we can call methods of the engine + per_connection_callbacks_ = { + { SASL_CB_USER, (sasl_callback_ft) & get_name, this}, // userid for authZ + { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for authT + { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi realm + // { SASL_CB_PASS, (sasl_callback_ft)&getsecret, this + { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} + }; +} + +// Cleanup of last resort. Call Finish to allow a safer check on disposal +CySaslEngine::~CySaslEngine() +{ + + if (conn_) { + try { + LockGuard saslGuard(getSaslMutex()); + sasl_dispose( &conn_); // undo sasl_client_new() + } catch (const LockFailure& e) { + LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << e.what()); + } + } +} // destructor + +// initialize some cyrus sasl context stuff: + +Status CySaslEngine::InitCyrusSasl() +{ + int rc = SASL_OK; + + // set up some callbacks once per process: + Status init_status = CyrusPerProcessData::Init(); + if (!init_status.ok()) + return init_status; + + // Initialize the sasl_li brary with per-connection configuration: + const char * fqdn = chosen_mech_.serverid.c_str(); + const char * proto = chosen_mech_.protocol.c_str(); + + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 0, &conn_); + if (rc != SASL_OK) { + return SaslError(rc); + } + } catch (const LockFailure& e) { + return Status::MutexError("mutex that guards sasl_client_new unable to lock"); + } + + return Status::OK(); +} // cysasl_new() + +// start() method: Ask the Sasl ibrary, "How do we +// ask the hdfs server for service? +std::pair<Status, std::string> +CySaslEngine::Start() +{ + int rc; + Status status; + + if (state_ != kUnstarted) + LOG_WARN(kRPC, << "CySaslEngine::start() when state is " << state_); + + status = InitCyrusSasl(); + + if ( !status.ok()) { + state_ = kErrorState; + return std::make_pair( status, ""); + } + + sasl_interact_t * client_interact = NULL; + char * buf; + unsigned int buflen; + const char * chosen_mech; + std::string token; + + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), &client_interact, + (const char **) &buf, &buflen, &chosen_mech); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); + } + + + switch (rc) { + case SASL_OK: state_ = kSuccess; + break; + case SASL_CONTINUE: state_ = kWaitingForData; + break; + default: state_ = kFailure; + return std::make_pair( SaslError(rc), ""); + break; + } // switch( rc) + + // Cyrus will free this buffer when the connection is shut down + token = std::string( buf, buflen); + return std::make_pair( Status::OK(), token); + +} // start() method + +std::pair<Status, std::string> CySaslEngine::Step(const std::string data) +{ + char * output = NULL; + unsigned int outlen = 0; + sasl_interact_t * client_interact = NULL; + + if (state_ != kWaitingForData) + LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_); + + int rc = 0; + try { + LockGuard saslGuard(getSaslMutex()); + rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact, + (const char **) &output, &outlen); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair( Status::MutexError("mutex that guards sasl_client_new unable to lock"), "" ); + } + // right now, state_ == kWaitingForData, + // so update state_, to reflect _step()'s result: + switch (rc) { + case SASL_OK: state_ = kSuccess; break; + case SASL_CONTINUE: state_ = kWaitingForData; break; + default: state_ = kFailure; + return std::make_pair(SaslError(rc), ""); + break; + } // switch( rc) + return std::make_pair(Status::OK(), std::string( output,outlen)); +} // step() method + +Status CySaslEngine::Finish() +{ + if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) + LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_); + + if (conn_ != nullptr) { + try { + LockGuard saslGuard(getSaslMutex()); + sasl_dispose( &conn_); + conn_ = NULL; + } catch (const LockFailure& e) { + return Status::MutexError("mutex that guards sasl_dispose unable to lock"); + } + } + + return Status::OK(); +} + +////////////////////////////////////////////////// +// Internal callbacks, for sasl_init_client(). // +// Mostly lifted from cyrus' sample_client.c . // +// Implicitly called in a context that already // +// holds the SASL/GSSAPI lock. // +////////////////////////////////////////////////// + +static int +sasl_my_log(void *context __attribute__((unused)), + int priority, + const char *message) +{ + if (! message) + return SASL_BADPARAM; + + //TODO: get client, connection ID in here + switch (priority) { + case SASL_LOG_NONE: return SASL_OK; // no-op + case SASL_LOG_ERR: // fall through to FAIL + case SASL_LOG_FAIL: + LOG_ERROR(kRPC, << "SASL Error: " << message); + break; + case SASL_LOG_WARN: + LOG_ERROR(kRPC, << message); + break; + case SASL_LOG_NOTE: + LOG_INFO(kRPC, << message); + break; + case SASL_LOG_DEBUG: + LOG_DEBUG(kRPC, << message); + break; + case SASL_LOG_TRACE: + LOG_TRACE(kRPC, << message); + break; + case SASL_LOG_PASS: return SASL_OK; // don't log password-info + default: + LOG_WARN(kRPC, << "Unknown SASL log level(" << priority << "): " << message); + break; + } + + return SASL_OK; +} // sasl_my_log() callback + +static int +sasl_getopt(void *context, const char *plugin_name, + const char *option, + const char **result, unsigned *len) +{ + if (plugin_name) { + LOG_WARN(kRPC, << "CySaslEngine: Unexpected plugin_name " << plugin_name); + return SASL_OK; + } // 123456789012345678 + if (! strncmp( option, "canon_user_plugin", 18)) { + // TODO: maybe write a canon_user_plugin to do user-to-principal mapping + *result = "INTERNAL"; + if (len) *len = strlen( *result); + return SASL_OK; + } // 12345678901234567 + if (! strncmp( option, "client_mech_list", 17)) { + *result = "GSSAPI"; + if (len) *len = strlen( *result); + return SASL_OK; + } + + (void) context; // unused + return SASL_OK; } + +#define PLUGINDIR "/usr/local/lib/sasl2" // where the mechanisms are + +static int +get_path(void *context, const char ** path) +{ + const char *searchpath = (const char *) context; + + if (! path) + return SASL_BADPARAM; + + // TODO: check the SASL_PATH environment, or will Cyrus pass that in in the context? + if (searchpath) { + *path = searchpath; + } else { + *path = PLUGINDIR; + } + + return SASL_OK; +} // getpath() callback + +int get_name(void *context, + int id, + const char **result, + unsigned *len) +{ + const CySaslEngine * pThis = (const CySaslEngine *) context; + + if (!result) + return SASL_BADPARAM; + + switch (id) { + case SASL_CB_AUTHNAME: + if (!pThis->id_) + break; + if (len) + *len = pThis->id_->size(); + *result = pThis->id_->c_str(); + break; + case SASL_CB_USER: + if (!pThis->principal_) + break; + if (len) + *len = pThis->principal_->size(); + *result = pThis->principal_->c_str(); + break; + case SASL_CB_LANGUAGE: + *result = NULL; + if (len) + *len = 0; + break; + default: + return SASL_BADPARAM; + } + + LOG_DEBUG(kRPC, << "Cyrus::get_name: returning " << *result); + + return SASL_OK; +} // simple() callback + +int getrealm(void *context, + int id, + const char **availrealms, + const char **result) +{ + (void)availrealms; // unused + const CySaslEngine * pThis = (const CySaslEngine *) context; + + if (!result) + return SASL_BADPARAM; + + if (id != SASL_CB_GETREALM) return SASL_FAIL; + if (pThis->realm_) + *result = pThis->realm_->c_str(); + + return SASL_OK; +} // getrealm() callback + + +/***************************************************************************** +* CYRUS PER-PROCESS INITIALIZATION +*/ + + +const sasl_callback_t per_process_callbacks[] = { + { SASL_CB_LOG, (sasl_callback_ft) & sasl_my_log, NULL}, + { SASL_CB_GETOPT, (sasl_callback_ft) & sasl_getopt, NULL}, + { SASL_CB_GETPATH, (sasl_callback_ft) & get_path, NULL}, // to find th mechanisms + { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL} + }; // callbacks_ array + +CyrusPerProcessData::CyrusPerProcessData() +{ + try { + LockGuard saslGuard(getSaslMutex()); + int init_rc = sasl_client_init(per_process_callbacks); + init_status_ = make_status(init_rc); + } catch (const LockFailure& e) { + init_status_ = Status::MutexError("mutex protecting process-wide sasl_client_init unable to lock"); + } +} + +CyrusPerProcessData::~CyrusPerProcessData() +{ + // Undo sasl_client_init()) + try { + LockGuard saslGuard(getSaslMutex()); + sasl_done(); + } catch (const LockFailure& e) { + // Not can be done at this point, but the process is most likely shutting down anyway. + LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to lock"); + } + +} + +Status CyrusPerProcessData::Init() +{ + return GetInstance().init_status_; +} + +CyrusPerProcessData & CyrusPerProcessData::GetInstance() +{ + // Meyer's singleton, thread safe and lazily initialized in C++11 + // + // Must be lazily initialized to allow client code to plug in a GSSAPI mutex + // implementation. + static CyrusPerProcessData per_process_data; + return per_process_data; +} + + +} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h new file mode 100644 index 0000000..7c0f4e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_RPC_CYRUS_SASLENGINE_H +#define LIB_RPC_CYRUS_SASLENGINE_H + +#include "sasl/sasl.h" +#include "sasl_engine.h" + +namespace hdfs +{ + +class CySaslEngine : public SaslEngine +{ +public: + CySaslEngine(); + virtual ~CySaslEngine(); + + virtual std::pair<Status, std::string> Start(); + virtual std::pair<Status, std::string> Step(const std::string data); + virtual Status Finish(); +private: + Status InitCyrusSasl(); + Status SaslError(int rc); + + friend int get_name(void *, int, const char **, unsigned *); + friend int getrealm(void *, int, const char **availrealms, const char **); + + sasl_conn_t * conn_; + std::vector<sasl_callback_t> per_connection_callbacks_; +}; //class CySaslEngine + +} // namespace hdfs + +#endif /* LIB_RPC_CYRUS_SASLENGINE_H */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc new file mode 100644 index 0000000..7705c81 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/locks.h" + +#include <sstream> +#include <gsasl.h> +#include "sasl_engine.h" +#include "gsasl_engine.h" +#include "common/logging.h" + + +namespace hdfs { + + +/***************************************************************************** + * GSASL UTILITY FUNCTIONS + */ + +static Mutex *getSaslMutex() { + return LockManager::getGssapiMutex(); +} + +static Status rc_to_status(int rc) +{ + if (rc == GSASL_OK) { + return Status::OK(); + } else { + std::ostringstream ss; + ss << "Cannot initialize client (" << rc << "): " << gsasl_strerror(rc); + return Status::Error(ss.str().c_str()); + } +} + +static +std::pair<Status, std::string> base64_encode(const std::string & in) { + char * temp; + size_t len; + std::string retval; + (void)base64_encode; + + int rc = gsasl_base64_to(in.c_str(), in.size(), &temp, &len); + + if (rc != GSASL_OK) { + return std::make_pair(rc_to_status(rc), ""); + } + + if (temp) { + retval = temp; + free(temp); + } + + if (!temp || retval.length() != len) { + return std::make_pair(Status::Error("SaslEngine: Failed to encode string to base64"), ""); + } + + return std::make_pair(Status::OK(), retval); +} + +/***************************************************************************** + * GSASL ENGINE + */ + +GSaslEngine::~GSaslEngine() +{ + // These should already be called in this->Finish + try { + LockGuard saslGuard(getSaslMutex()); + if (session_ != nullptr) { + gsasl_finish(session_); + } + + if (ctx_ != nullptr) { + gsasl_done(ctx_); + } + } catch (const LockFailure& e) { + if(session_ || ctx_) { + LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to dispose of gsasl state: " << e.what()); + } + } +} + +Status GSaslEngine::gsasl_new() { + int status = GSASL_OK; + + if (ctx_) return Status::OK(); + + try { + LockGuard saslGuard(getSaslMutex()); + status = gsasl_init( & ctx_); + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards gsasl_init unable to lock"); + } + + switch ( status) { + case GSASL_OK: + return Status::OK(); + case GSASL_MALLOC_ERROR: + LOG_WARN(kRPC, << "GSaslEngine: Out of memory."); + return Status::Error("SaslEngine: Out of memory."); + default: + LOG_WARN(kRPC, << "GSaslEngine: Unexpected error." << status); + return Status::Error("SaslEngine: Unexpected error."); + } +} // gsasl_new() + +std::pair<Status, std::string> +GSaslEngine::Start() +{ + int rc; + Status status; + + this->gsasl_new(); + + /* Create new authentication session. */ + try { + LockGuard saslGuard(getSaslMutex()); + rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_); + } catch (const LockFailure& e) { + state_ = kErrorState; + return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); + } + if (rc != GSASL_OK) { + state_ = kErrorState; + return std::make_pair( rc_to_status( rc), std::string("")); + } + Status init_status = init_kerberos(); + if(!init_status.ok()) { + state_ = kErrorState; + return std::make_pair(init_status, ""); + } + + state_ = kWaitingForData; + + // get from the sasl library the initial token + // that we'll send to the application server: + return this->Step( chosen_mech_.challenge.c_str()); +} // start() method + +Status GSaslEngine::init_kerberos() { + + //TODO: check that we have a principal + try { + LockGuard saslGuard(getSaslMutex()); + // these don't return anything that indicates failure + gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str()); + gsasl_property_set(session_, GSASL_HOSTNAME, chosen_mech_.serverid.c_str()); + gsasl_property_set(session_, GSASL_SERVICE, chosen_mech_.protocol.c_str()); + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards gsasl_property_set in GSaslEngine::init_kerberos unable to lock"); + } + return Status::OK(); +} + +std::pair<Status, std::string> GSaslEngine::Step(const std::string data) { + if (state_ != kWaitingForData) + LOG_WARN(kRPC, << "GSaslEngine::step when state is " << state_); + + char * output = NULL; + size_t outputSize; + + int rc = 0; + try { + LockGuard saslGuard(getSaslMutex()); + rc = gsasl_step(session_, data.c_str(), data.size(), &output, + &outputSize); + } catch (const LockFailure& e) { + state_ = kFailure; + return std::make_pair(Status::MutexError("Mutex that guards gsasl_client_start unable to lock"), ""); + } + + if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) { + std::string retval(output, output ? outputSize : 0); + if (output) { + free(output); + } + + if (rc == GSASL_OK) { + state_ = kSuccess; + } + + return std::make_pair(Status::OK(), retval); + } + else { + if (output) { + free(output); + } + state_ = kFailure; + return std::make_pair(rc_to_status(rc), ""); + } +} + +Status GSaslEngine::Finish() +{ + if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState ) + LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_); + + try { + LockGuard saslGuard(getSaslMutex()); + if (session_ != nullptr) { + gsasl_finish(session_); + session_ = NULL; + } + + if (ctx_ != nullptr) { + gsasl_done(ctx_); + ctx_ = nullptr; + } + } catch (const LockFailure& e) { + return Status::MutexError("Mutex that guards sasl state cleanup in GSaslEngine::Finish unable to lock"); + } + return Status::OK(); +} // finish() method + +} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h new file mode 100644 index 0000000..331b3fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.h @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_RPC_GSASLENGINE_H +#define LIB_RPC_GSASLENGINE_H + +#include <gsasl.h> + +#include "sasl_engine.h" + +namespace hdfs { + +class GSaslEngine : public SaslEngine +{ +public: + GSaslEngine() : SaslEngine(), ctx_(nullptr), session_(nullptr) {} + virtual ~GSaslEngine(); + + virtual std::pair<Status,std::string> Start(); + virtual std::pair<Status,std::string> Step(const std::string data); + virtual Status Finish(); +private: + Status gsasl_new(); + Gsasl * ctx_; + Gsasl_session * session_; + + Status init_kerberos(); +}; + +} // namespace hdfs + +#endif /* LIB_RPC_GSASLENGINE_H */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc new file mode 100644 index 0000000..e83a28c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "namenode_tracker.h" + +#include "common/logging.h" +#include "common/libhdfs_events_impl.h" +#include "common/util.h" + +namespace hdfs { + +static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) { + std::stringstream ss; + for(unsigned int i=0; i<pts.size(); i++) + if(i == pts.size() - 1) + ss << pts[i]; + else + ss << pts[i] << ", "; + return ss.str(); +} + +HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, + ::asio::io_service *ioservice, + std::shared_ptr<LibhdfsEvents> event_handlers) + : enabled_(false), resolved_(false), + ioservice_(ioservice), event_handlers_(event_handlers) +{ + LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes"); + for(unsigned int i=0;i<servers.size();i++) + LOG_TRACE(kRPC, << servers[i].str()); + + if(servers.size() >= 2) { + LOG_TRACE(kRPC, << "Creating HA namenode tracker"); + if(servers.size() > 2) { + LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used."); + } + + active_info_ = servers[0]; + standby_info_ = servers[1]; + LOG_INFO(kRPC, << "HA enabled. Using the following namenodes from the configuration." + << "\nNote: Active namenode cannot be determined until a connection has been made.") + LOG_INFO(kRPC, << "First namenode url = " << active_info_.uri.str()); + LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str()); + + enabled_ = true; + if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { + resolved_ = true; + } + } +} + +HANamenodeTracker::~HANamenodeTracker() {} + +bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, + ResolvedNamenodeInfo& out) +{ + mutex_guard swap_lock(swap_lock_); + + // Cannot look up without a key. + if(current_endpoints.size() == 0) { + event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(), + 0 /*Not much to say about context without endpoints*/); + LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint."); + return false; + } + + LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]); + + if(IsCurrentActive_locked(current_endpoints[0])) { + std::swap(active_info_, standby_info_); + if(event_handlers_) + event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); + out = active_info_; + } else if(IsCurrentStandby_locked(current_endpoints[0])) { + // Connected to standby + if(event_handlers_) + event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), + reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); + out = active_info_; + } else { + // Invalid state (or a NIC was added that didn't show up during DNS) + std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream + errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n" + << format_endpoints(active_info_.endpoints) << " and\n" + << format_endpoints(standby_info_.endpoints) << std::endl; + LOG_ERROR(kRPC, << errorMsg.str()); + return false; + } + + // Extra DNS on swapped node to try and get EPs if it didn't already have them + if(out.endpoints.empty()) { + LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again"); + if(!ResolveInPlace(ioservice_, out)) { + // Stuck retrying against the same NN that was able to be resolved in this case + LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str() + << " failed. Please make sure your configuration is up to date."); + } + } + + return true; +} + + +bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { + for(unsigned int i=0;i<active_info_.endpoints.size();i++) { + if(ep.address() == active_info_.endpoints[i].address()) { + if(ep.port() != active_info_.endpoints[i].port()) + LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway.."); + return true; + } + } + return false; +} + +bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const { + for(unsigned int i=0;i<standby_info_.endpoints.size();i++) { + if(ep.address() == standby_info_.endpoints[i].address()) { + if(ep.port() != standby_info_.endpoints[i].port()) + LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway.."); + return true; + } + } + return false; +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h new file mode 100644 index 0000000..cc34f51 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_RPC_NAMENODE_TRACKER_H +#define LIB_RPC_NAMENODE_TRACKER_H + +#include "common/libhdfs_events_impl.h" +#include "common/namenode_info.h" + +#include <asio/ip/tcp.hpp> + +#include <memory> +#include <mutex> +#include <vector> + +namespace hdfs { + +/* + * Tracker gives the RpcEngine a quick way to use an endpoint that just + * failed in order to lookup a set of endpoints for a failover node. + * + * Note: For now this only deals with 2 NameNodes, but that's the default + * anyway. + */ +class HANamenodeTracker { + public: + HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, + ::asio::io_service *ioservice, + std::shared_ptr<LibhdfsEvents> event_handlers_); + + virtual ~HANamenodeTracker(); + + bool is_enabled() const { return enabled_; } + bool is_resolved() const { return resolved_; } + + // Pass in vector of endpoints held by RpcConnection, use endpoints to infer node + // currently being used. Swap internal state and set out to other node. + // Note: This will always mutate internal state. Use IsCurrentActive/Standby to + // get info without changing state + bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, + ResolvedNamenodeInfo& out); + + private: + // See if endpoint ep is part of the list of endpoints for the active or standby NN + bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; + bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; + + // If HA should be enabled, according to our options and runtime info like # nodes provided + bool enabled_; + // If we were able to resolve at least 1 HA namenode + bool resolved_; + + // Keep service in case a second round of DNS lookup is required + ::asio::io_service *ioservice_; + + // Event handlers, for now this is the simplest place to catch all failover events + // and push info out to client application. Possibly move into RPCEngine. + std::shared_ptr<LibhdfsEvents> event_handlers_; + + // Only support 1 active and 1 standby for now. + ResolvedNamenodeInfo active_info_; + ResolvedNamenodeInfo standby_info_; + + // Aquire when switching from active-standby + std::mutex swap_lock_; +}; + +} // end namespace hdfs +#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc new file mode 100644 index 0000000..356411e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "request.h" +#include "rpc_engine.h" +#include "sasl_protocol.h" + +#include "RpcHeader.pb.h" +#include "ProtobufRpcEngine.pb.h" +#include "IpcConnectionContext.pb.h" + +#include <sstream> + +namespace hdfs { + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +using namespace ::hadoop::common; +using namespace ::std::placeholders; + +static const int kNoRetry = -1; + +// Protobuf helper functions. +// Note/todo: Using the zero-copy protobuf API here makes the simple procedures +// below tricky to read and debug while providing minimal benefit. Reducing +// allocations in BlockReader (HDFS-11266) and smarter use of std::stringstream +// will have a much larger impact according to cachegrind profiles on common +// workloads. +static void AddHeadersToPacket(std::string *res, + std::initializer_list<const pb::MessageLite *> headers, + const std::string *payload) { + int len = 0; + std::for_each( + headers.begin(), headers.end(), + [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); + + if (payload) { + len += payload->size(); + } + + int net_len = htonl(len); + res->reserve(res->size() + sizeof(net_len) + len); + + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); + + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + + std::for_each( + headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); + buf = v->SerializeWithCachedSizesToArray(buf); + }); + + if (payload) { + buf = os.WriteStringToArray(*payload, buf); + } +} + +static void ConstructPayload(std::string *res, const pb::MessageLite *header) { + int len = DelimitedPBMessageSize(header); + res->reserve(len); + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + assert(buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf); + buf = header->SerializeWithCachedSizesToArray(buf); +} + +static void SetRequestHeader(std::weak_ptr<LockFreeRpcEngine> weak_engine, int call_id, + const std::string &method_name, int retry_count, + RpcRequestHeaderProto *rpc_header, + RequestHeaderProto *req_header) +{ + // Ensure the RpcEngine is live. If it's not then the FileSystem is being destructed. + std::shared_ptr<LockFreeRpcEngine> counted_engine = weak_engine.lock(); + if(!counted_engine) { + LOG_ERROR(kRPC, << "SetRequestHeader attempted to access an invalid RpcEngine"); + return; + } + + rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); + rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + rpc_header->set_callid(call_id); + if (retry_count != kNoRetry) { + rpc_header->set_retrycount(retry_count); + } + rpc_header->set_clientid(counted_engine->client_id()); + req_header->set_methodname(method_name); + req_header->set_declaringclassprotocolname(counted_engine->protocol_name()); + req_header->set_clientprotocolversion(counted_engine->protocol_version()); +} + +// Request implementation + +Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &method_name, int call_id, + const pb::MessageLite *request, Handler &&handler) + : engine_(engine), + method_name_(method_name), + call_id_(call_id), + timer_(engine->io_service()), + handler_(std::move(handler)), + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) +{ + ConstructPayload(&payload_, request); +} + +Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler) + : engine_(engine), + call_id_(-1/*Handshake ID*/), + timer_(engine->io_service()), + handler_(std::move(handler)), + retry_count_(engine->retry_policy() ? 0 : kNoRetry), + failover_count_(0) { +} + +void Request::GetPacket(std::string *res) const { + LOG_TRACE(kRPC, << "Request::GetPacket called"); + + if (payload_.empty()) + return; + + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header, + &req_header); + + // SASL messages don't have a request header + if (method_name_ != SASL_METHOD_NAME) + AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_); + else + AddHeadersToPacket(res, {&rpc_header}, &payload_); +} + +void Request::OnResponseArrived(pbio::CodedInputStream *is, + const Status &status) { + LOG_TRACE(kRPC, << "Request::OnResponseArrived called"); + handler_(is, status); +} + +std::string Request::GetDebugString() const { + // Basic description of this object, aimed at debugging + std::stringstream ss; + ss << "\nRequest Object:\n"; + ss << "\tMethod name = \"" << method_name_ << "\"\n"; + ss << "\tCall id = " << call_id_ << "\n"; + ss << "\tRetry Count = " << retry_count_ << "\n"; + ss << "\tFailover count = " << failover_count_ << "\n"; + return ss.str(); +} + +int Request::IncrementFailoverCount() { + // reset retry count when failing over + retry_count_ = 0; + return failover_count_++; +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h new file mode 100644 index 0000000..f195540 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_REQUEST_H +#define LIB_RPC_RPC_REQUEST_H + +#include "hdfspp/status.h" +#include "common/util.h" +#include "common/new_delete.h" + +#include <string> +#include <memory> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <asio/deadline_timer.hpp> + + +namespace hdfs { + +class LockFreeRpcEngine; +class SaslProtocol; + +/* + * Internal bookkeeping for an outstanding request from the consumer. + * + * Threading model: not thread-safe; should only be accessed from a single + * thread at a time + */ +class Request { + public: + MEMCHECKED_CLASS(Request) + typedef std::function<void(::google::protobuf::io::CodedInputStream *is, + const Status &status)> Handler; + + // Constructors will not make any blocking calls while holding the shared_ptr<RpcEngine> + Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &method_name, int call_id, + const ::google::protobuf::MessageLite *request, Handler &&callback); + + // Null request (with no actual message) used to track the state of an + // initial Connect call + Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler); + + int call_id() const { return call_id_; } + std::string method_name() const { return method_name_; } + ::asio::deadline_timer &timer() { return timer_; } + int IncrementRetryCount() { return retry_count_++; } + int IncrementFailoverCount(); + void GetPacket(std::string *res) const; + void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, + const Status &status); + + int get_failover_count() {return failover_count_;} + + std::string GetDebugString() const; + + private: + std::weak_ptr<LockFreeRpcEngine> engine_; + const std::string method_name_; + const int call_id_; + + ::asio::deadline_timer timer_; + std::string payload_; + const Handler handler_; + + int retry_count_; + int failover_count_; +}; + +} // end namespace hdfs +#endif // end include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h new file mode 100644 index 0000000..9e54983 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_CONNECTION_H +#define LIB_RPC_RPC_CONNECTION_H + +/* + * Encapsulates a persistent connection to the NameNode, and the sending of + * RPC requests and evaluating their responses. + * + * Can have multiple RPC requests in-flight simultaneously, but they are + * evaluated in-order on the server side in a blocking manner. + * + * Threading model: public interface is thread-safe + * All handlers passed in to method calls will be called from an asio thread, + * and will not be holding any internal RpcConnection locks. + */ + +#include "request.h" +#include "common/auth_info.h" +#include "common/libhdfs_events_impl.h" +#include "common/new_delete.h" +#include "hdfspp/status.h" + +#include <functional> +#include <memory> +#include <vector> +#include <deque> +#include <unordered_map> + +namespace hdfs { + +typedef const std::function<void(const Status &)> RpcCallback; + +class LockFreeRpcEngine; +class SaslProtocol; + +class RpcConnection : public std::enable_shared_from_this<RpcConnection> { + public: + MEMCHECKED_CLASS(RpcConnection) + RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine); + virtual ~RpcConnection(); + + // Note that a single server can have multiple endpoints - especially both + // an ipv4 and ipv6 endpoint + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + const AuthInfo & auth_info, + RpcCallback &handler) = 0; + virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; + virtual void Disconnect() = 0; + + void StartReading(); + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler); + + void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests); + + // Enqueue requests before the connection is connected. Will be flushed + // on connect + void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests); + + // Put requests at the front of the current request queue + void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests); + + void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers); + void SetClusterName(std::string cluster_name); + void SetAuthInfo(const AuthInfo& auth_info); + + std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; } + ::asio::io_service *GetIoService(); + + protected: + struct Response { + enum ResponseState { + kReadLength, + kReadContent, + kParseResponse, + } state_; + unsigned length_; + std::vector<char> data_; + + std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; + std::unique_ptr<::google::protobuf::io::CodedInputStream> in; + + Response() : state_(kReadLength), length_(0) {} + }; + + + // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected + virtual void SendHandshake(RpcCallback &handler) = 0; + void HandshakeComplete(const Status &s); + void AuthComplete(const Status &s, const AuthInfo & new_auth_info); + void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); + virtual void SendContext(RpcCallback &handler) = 0; + void ContextComplete(const Status &s); + + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void FlushPendingRequests()=0; // Synchronously write the next request + + void AsyncRpc_locked( + const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const RpcCallback &handler); + void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests); + void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time + + + + std::shared_ptr<std::string> PrepareHandshakePacket(); + std::shared_ptr<std::string> PrepareContextPacket(); + static std::string SerializeRpcRequest(const std::string &method_name, + const ::google::protobuf::MessageLite *req); + + Status HandleRpcResponse(std::shared_ptr<Response> response); + void HandleRpcTimeout(std::shared_ptr<Request> req, + const ::asio::error_code &ec); + void CommsError(const Status &status); + + void ClearAndDisconnect(const ::asio::error_code &ec); + std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); + + std::weak_ptr<LockFreeRpcEngine> engine_; + std::shared_ptr<Response> current_response_state_; + AuthInfo auth_info_; + + // Connection can have deferred connection, especially when we're pausing + // during retry + enum ConnectedState { + kNotYetConnected, + kConnecting, + kHandshaking, + kAuthenticating, + kConnected, + kDisconnected + }; + static std::string ToString(ConnectedState connected); + ConnectedState connected_; + + // State machine for performing a SASL handshake + std::shared_ptr<SaslProtocol> sasl_protocol_; + // The request being sent over the wire; will also be in sent_requests_ + std::shared_ptr<Request> outgoing_request_; + // Requests to be sent over the wire + std::deque<std::shared_ptr<Request>> pending_requests_; + // Requests to be sent over the wire during authentication; not retried if + // there is a connection error + std::deque<std::shared_ptr<Request>> auth_requests_; + // Requests that are waiting for responses + typedef std::unordered_map<int, std::shared_ptr<Request>> SentRequestMap; + SentRequestMap sent_requests_; + + std::shared_ptr<LibhdfsEvents> event_handlers_; + std::string cluster_name_; + + // Lock for mutable parts of this class that need to be thread safe + std::mutex connection_state_lock_; + + friend class SaslProtocol; +}; + +} // end namespace hdfs +#endif // end include Guard --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org