http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc new file mode 100644 index 0000000..691d2ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "configuration_loader.h" +#include "common/logging.h" + +#include <fstream> +#include <strings.h> +#include <sstream> +#include <map> +#include <sys/stat.h> +#include <rapidxml/rapidxml.hpp> +#include <rapidxml/rapidxml_utils.hpp> + +namespace hdfs { + +/* + * ConfigurationLoader class + */ + +#if defined(WIN32) || defined(_WIN32) +static const char kFileSeparator = '\\'; +#else +static const char kFileSeparator = '/'; +#endif + +static const char kSearchPathSeparator = ':'; + +bool is_valid_bool(const std::string& raw) { + if (raw.empty()) { + return false; + } + + if (!strcasecmp(raw.c_str(), "true")) { + return true; + } + if (!strcasecmp(raw.c_str(), "false")) { + return true; + } + return false; +} + +bool str_to_bool(const std::string& raw) { + if (!strcasecmp(raw.c_str(), "true")) { + return true; + } + + return false; +} + +ConfigurationLoader::ConfigurationLoader() { + //In order to creates a configuration loader with the default search path + //("$HADOOP_CONF_DIR" or "/etc/hadoop/conf") we call SetDefaultSearchPath(). + ConfigurationLoader::SetDefaultSearchPath(); +} + +void ConfigurationLoader::SetDefaultSearchPath() { + // Try (in order, taking the first valid one): + // $HADOOP_CONF_DIR + // /etc/hadoop/conf + const char * hadoop_conf_dir_env = getenv("HADOOP_CONF_DIR"); + if (hadoop_conf_dir_env) { + std::stringstream ss(hadoop_conf_dir_env); + std::string path; + while (std::getline(ss, path, kSearchPathSeparator)) { + AddToSearchPath(path); + } + } else { + AddToSearchPath("/etc/hadoop/conf"); + } +} + +void ConfigurationLoader::ClearSearchPath() +{ + search_path_.clear(); +} + +void ConfigurationLoader::SetSearchPath(const std::string & searchPath) +{ + search_path_.clear(); + + std::vector<std::string> paths; + std::string::size_type start = 0; + std::string::size_type end = searchPath.find(kSearchPathSeparator); + + while (end != std::string::npos) { + paths.push_back(searchPath.substr(start, end-start)); + start = ++end; + end = searchPath.find(kSearchPathSeparator, start); + } + paths.push_back(searchPath.substr(start, searchPath.length())); + + for (auto path: paths) { + AddToSearchPath(path); + } + +} + +void ConfigurationLoader::AddToSearchPath(const std::string & searchPath) +{ + if (searchPath.empty()) + return; + + if (searchPath.back() != kFileSeparator) { + std::string pathWithSlash(searchPath); + pathWithSlash += kFileSeparator; + search_path_.push_back(pathWithSlash); + } else { + search_path_.push_back(searchPath); + } +} + +std::string ConfigurationLoader::GetSearchPath() +{ + std::stringstream result; + bool first = true; + for(std::string item: search_path_) { + if (!first) { + result << kSearchPathSeparator; + } + + result << item; + first = false; + } + + return result.str(); +} + +Status validateStream(std::istream & stream) { + std::streampos start = stream.tellg(); + stream.seekg(0, std::ios::end); + std::streampos end = stream.tellg(); + stream.seekg(start, std::ios::beg); + + int length = end - start; + + if (length <= 0 || start == -1 || end == -1) + return Status::Error("The configuration file is empty"); + + LOG_DEBUG(kFileSystem, << "validateStream will read a config file of length " << length); + + std::vector<char> raw_bytes((int64_t)length + 1); + stream.read(&raw_bytes[0], length); + raw_bytes[length] = 0; + + try { + rapidxml::xml_document<> dom; + dom.parse<rapidxml::parse_trim_whitespace|rapidxml::parse_validate_closing_tags>(&raw_bytes[0]); + + /* File must contain a single <configuration> stanza */ + auto config_node = dom.first_node("configuration", 0, false); + if (!config_node) { + return Status::Error("The configuration file is missing a 'configuration' tag"); + } + return Status::OK(); + } catch (const rapidxml::parse_error &e) { + size_t location = e.where<char>() - &raw_bytes[0]; + std::string msg = "The configuration file has invalid xml around character " + std::to_string(location); + return Status::Error(msg.c_str()); + } +} + +std::vector<std::pair<std::string, Status> > ConfigurationLoader::ValidateResources(std::vector<std::string> filenames) const +{ + std::vector<std::pair<std::string, Status> > stats; + bool found; + for(auto file: filenames) { + found = false; + for(auto dir: search_path_) { + std::ifstream stream(dir + file); + if ( stream.is_open() ) { + found = true; + stats.push_back(std::make_pair(file,validateStream(stream))); + } else { + LOG_DEBUG(kFileSystem, << dir << file << " was not found"); + } + } + if(!found) { + std::string msg("No directory in the current search path contains the file [" + file + "]"); + stats.push_back(std::make_pair(file,Status::PathNotFound(msg.c_str()))); + } + } + return stats; +} + +bool ConfigurationLoader::UpdateMapWithFile(ConfigMap & map, const std::string & path) const +{ + if (path.front() == kFileSeparator) { // Absolute path + std::ifstream stream(path, std::ifstream::in); + if ( stream.is_open() ) { + return UpdateMapWithStream(map, stream); + } else { + return false; + } + } else { // Use search path + for(auto dir: search_path_) { + std::ifstream stream(dir + path); + if ( stream.is_open() ) { + if (UpdateMapWithStream(map, stream)) + return true; + } + } + } + + return false; +} + +bool ConfigurationLoader::UpdateMapWithStream(ConfigMap & map, + std::istream & stream) { + std::streampos start = stream.tellg(); + stream.seekg(0, std::ios::end); + std::streampos end = stream.tellg(); + stream.seekg(start, std::ios::beg); + + int length = end - start; + + if (length <= 0 || start == -1 || end == -1) + return false; + + std::vector<char> raw_bytes((int64_t)length + 1); + stream.read(&raw_bytes[0], length); + raw_bytes[length] = 0; + + return UpdateMapWithBytes(map, raw_bytes); +} + +bool ConfigurationLoader::UpdateMapWithString(ConfigMap & map, + const std::string &xml_data) { + if (xml_data.size() == 0) { + return false; + } + + std::vector<char> raw_bytes(xml_data.begin(), xml_data.end()); + raw_bytes.push_back('\0'); + + bool success = UpdateMapWithBytes(map, raw_bytes); + + if (success) { + return true; + } else { + return false; + } +} + +bool ConfigurationLoader::UpdateMapWithBytes(ConfigMap& map, + std::vector<char>& raw_bytes) { + try { + rapidxml::xml_document<> dom; + dom.parse<rapidxml::parse_trim_whitespace>(&raw_bytes[0]); + + /* File must contain a single <configuration> stanza */ + auto config_node = dom.first_node("configuration", 0, false); + if (!config_node) { + return false; + } + + /* Walk all of the <property> nodes, ignoring the rest */ + for (auto property_node = config_node->first_node("property", 0, false); + property_node; + property_node = property_node->next_sibling("property", 0, false)) { + auto name_node = property_node->first_node("name", 0, false); + auto value_node = property_node->first_node("value", 0, false); + + if (name_node && value_node) { + std::string final_value; + auto final_node = property_node->first_node("final", 0, false); + if (final_node) { + final_value = final_node->value(); + } + UpdateMapWithValue(map, name_node->value(), value_node->value(), final_value); + } + + auto name_attr = property_node->first_attribute("name", 0, false); + auto value_attr = property_node->first_attribute("value", 0, false); + + if (name_attr && value_attr) { + std::string final_value; + auto final_attr = property_node->first_attribute("final", 0, false); + if (final_attr) { + final_value = final_attr->value(); + } + UpdateMapWithValue(map, name_attr->value(), value_attr->value(), final_value); + } + } + + return true; + } catch (const rapidxml::parse_error &e) { + // TODO: Capture the result in a Status object + return false; + } +} + +bool ConfigurationLoader::UpdateMapWithValue(ConfigMap& map, + const std::string& key, const std::string& value, + const std::string& final_text) +{ + std::string caseFixedKey = Configuration::fixCase(key); + auto mapValue = map.find(caseFixedKey); + if (mapValue != map.end() && mapValue->second.final) { + return false; + } + + bool final_value = false; + if (is_valid_bool(final_text)) { + final_value = str_to_bool(final_text); + } + + map[caseFixedKey].value = value; + map[caseFixedKey].final = final_value; + return true; +} + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h new file mode 100644 index 0000000..2673a6b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_CONFIGURATION_BUILDER_H_ +#define COMMON_CONFIGURATION_BUILDER_H_ + +#include "configuration.h" +#include "hdfspp/status.h" + +namespace hdfs { + + +class ConfigurationLoader { +public: + // Creates a new, empty Configuration object + // T must be Configuration or a subclass + template<class T> + T NewConfig(); + + /**************************************************************************** + * LOADING CONFIG FILES + ***************************************************************************/ + + // Loads Configuration XML contained in a string/stream/file and returns a parsed + // Configuration object. + // T must be Configuration or a subclass + template<class T> + optional<T> Load(const std::string &xml_data); + // Streams must be seekable + template<class T> + optional<T> LoadFromStream(std::istream & stream); + // The ConfigurationBuilder's search path will be searched for the filename + // unless it is an absolute path + template<class T> + optional<T> LoadFromFile(const std::string &filename); + + // Loads Configuration XML contained in a string and produces a new copy that + // is the union of the src and xml_data + // Any parameters from src will be overwritten by the xml_data unless they + // are marked as "final" in src. + // T must be Configuration or a subclass + template<class T> + optional<T> OverlayResourceString(const T &src, const std::string &xml_data) const; + // Streams must be seekable + template<class T> + optional<T> OverlayResourceStream(const T &src, std::istream &stream) const; + // The ConfigurationBuilder's search path will be searched for the filename + // unless it is an absolute path + template<class T> + optional<T> OverlayResourceFile(const T &src, const std::string &path) const; + + // Attempts to update the map. If the update failed (because there was + // an existing final value, for example), returns the original map + template<class T> + optional<T> OverlayValue(const T &src, const std::string &key, const std::string &value) const; + + // Returns an instance of the Configuration with all of the default resource + // files loaded. + // T must be Configuration or a subclass + template<class T> + optional<T> LoadDefaultResources(); + + + // Returns a vector of filenames and the corresponding status when validation is attempted. + // If the files can be successfully validated, then the status returned for that file is Status::OK + // The files that are validated are those returned by T::GetDefaultFilenames(). + // T must be Configuration or a subclass + template<class T> + std::vector<std::pair<std::string, Status>> ValidateDefaultResources() const; + + /**************************************************************************** + * SEARCH PATH METHODS + ***************************************************************************/ + + //Creates a configuration loader with the default search path ("$HADOOP_CONF_DIR" or "/etc/hadoop/conf"). + //If you want to explicitly set the entire search path, call ClearSearchPath() first + ConfigurationLoader(); + + // Sets the search path to the default search path (namely, "$HADOOP_CONF_DIR" or "/etc/hadoop/conf") + void SetDefaultSearchPath(); + + // Clears out the search path + void ClearSearchPath(); + // Sets the search path to ":"-delimited paths + void SetSearchPath(const std::string & searchPath); + // Adds an element to the search path + void AddToSearchPath(const std::string & searchPath); + // Returns the search path in ":"-delmited form + std::string GetSearchPath(); + +protected: + using ConfigMap = Configuration::ConfigMap; + + std::vector<std::pair<std::string, Status>> ValidateResources(std::vector<std::string> filenames) const; + + // Updates the src map with data from the XML in the path + // The search path will be searched for the filename + bool UpdateMapWithFile(ConfigMap & map, const std::string & path) const; + + // Updates the src map with data from the XML in the stream + // The stream must be seekable + static bool UpdateMapWithStream(ConfigMap & map, + std::istream & stream); + // Updates the src map with data from the XML + static bool UpdateMapWithString(Configuration::ConfigMap & src, + const std::string &xml_data); + // Updates the src map with data from the XML + static bool UpdateMapWithBytes(Configuration::ConfigMap &map, + std::vector<char> &raw_bytes); + + // Attempts to update the map. If the update failed (because there was + // an existing final value, for example), returns false + static bool UpdateMapWithValue(ConfigMap& map, + const std::string& key, const std::string& value, const std::string& final_text); + + std::vector<std::string> search_path_; +}; + +} + +#include "configuration_loader_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h new file mode 100644 index 0000000..dad5a82 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_CONFIGURATION_BUILDER_IMPL_H_ +#define COMMON_CONFIGURATION_BUILDER_IMPL_H_ + +namespace hdfs { + + +template<class T> +T ConfigurationLoader::NewConfig() { + return T(); +} + +template<class T> +optional<T> ConfigurationLoader::Load(const std::string &xml_data) { + return OverlayResourceString<T>(T(), xml_data); +} +template<class T> +optional<T> ConfigurationLoader::LoadFromStream(std::istream &stream) { + return OverlayResourceStream<T>(T(), stream); +} +template<class T> +optional<T> ConfigurationLoader::LoadFromFile(const std::string &path) { + return OverlayResourceFile<T>(T(), path); +} + + +template<class T> +optional<T> ConfigurationLoader::OverlayResourceFile(const T& src, const std::string &path) const { + ConfigMap map(src.raw_values_); + bool success = UpdateMapWithFile(map, path); + + if (success) { + return std::experimental::make_optional<T>(map); + } else { + return optional<T>(); + } +} + +template<class T> +optional<T> ConfigurationLoader::OverlayResourceStream(const T& src, std::istream & stream) const { + ConfigMap map(src.raw_values_); + bool success = UpdateMapWithStream(map, stream); + + if (success) { + return std::experimental::make_optional<T>(map); + } else { + return optional<T>(); + } +} + +template<class T> +optional<T> ConfigurationLoader::OverlayResourceString(const T& src, const std::string &xml_data) const { + if (xml_data.size() == 0) { + return optional<T>(); + } + + std::vector<char> raw_bytes(xml_data.begin(), xml_data.end()); + raw_bytes.push_back('\0'); + + ConfigMap map(src.raw_values_); + bool success = UpdateMapWithBytes(map, raw_bytes); + + if (success) { + return std::experimental::make_optional<T>(map); + } else { + return optional<T>(); + } +} + +template<class T> +optional<T> ConfigurationLoader::OverlayValue(const T& src, const std::string &key, const std::string &value) const { + ConfigMap map(src.raw_values_); + UpdateMapWithValue(map, key, value, ""); + + return std::experimental::make_optional<T>(map); +} + +template <class T> +optional<T> ConfigurationLoader::LoadDefaultResources() { + std::vector<std::string> default_filenames = T::GetDefaultFilenames(); + + ConfigMap result; + bool success = false; + + for (auto fn: default_filenames) { + // We succeed if we have loaded data from any file + success |= UpdateMapWithFile(result, fn); + } + + if (success) { + return std::experimental::make_optional<T>(result); + } else { + return optional<T>(); + } +} + +template<class T> +std::vector<std::pair<std::string, Status> > ConfigurationLoader::ValidateDefaultResources() const{ + return ValidateResources(T::GetDefaultFilenames()); +} + + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/content_summary.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/content_summary.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/content_summary.cc new file mode 100644 index 0000000..0dca36a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/content_summary.cc @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hdfspp/content_summary.h> +#include <sstream> +#include <iomanip> + +namespace hdfs { + +ContentSummary::ContentSummary() +: length(0), + filecount(0), + directorycount(0), + quota(0), + spaceconsumed(0), + spacequota(0) { +} + +std::string ContentSummary::str(bool include_quota) const { + std::stringstream ss; + if(include_quota){ + ss << this->quota << " " + << spacequota << " " + << spaceconsumed << " "; + } + ss << directorycount << " " + << filecount << " " + << length << " " + << path; + return ss.str(); +} + +std::string ContentSummary::str_du() const { + std::stringstream ss; + ss << std::left << std::setw(10) << length + << path; + return ss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h new file mode 100644 index 0000000..193358f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_ASIO_H_ +#define LIB_COMMON_CONTINUATION_ASIO_H_ + +#include "continuation.h" +#include "common/util.h" + +#include "hdfspp/status.h" + +#include <asio/connect.hpp> +#include <asio/read.hpp> +#include <asio/write.hpp> +#include <asio/ip/tcp.hpp> +#include <memory> + +namespace hdfs { +namespace asio_continuation { + +using namespace continuation; + +template <class Stream, class ConstBufferSequence> +class WriteContinuation : public Continuation { +public: + WriteContinuation(std::shared_ptr<Stream>& stream, const ConstBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_write(*stream_, buffer_, handler); + } + +private: + // prevent construction from raw ptr + WriteContinuation(Stream *stream, ConstBufferSequence &buffer); + std::shared_ptr<Stream> stream_; + ConstBufferSequence buffer_; +}; + +template <class Stream, class ConstBufferSequence> +static inline Continuation *Write(std::shared_ptr<Stream> stream, + const ConstBufferSequence &buffer) { + return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer); +} + +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h new file mode 100644 index 0000000..18aa146 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_ +#define LIB_COMMON_CONTINUATION_CONTINUATION_H_ + +#include "hdfspp/status.h" +#include "common/cancel_tracker.h" + +#include <functional> +#include <memory> +#include <vector> + +namespace hdfs { +namespace continuation { + +class PipelineBase; + +/** + * A continuation is a fragment of runnable code whose execution will + * be scheduled by a \link Pipeline \endlink. + * + * The Continuation class is a build block to implement the + * Continuation Passing Style (CPS) in libhdfs++. In CPS, the + * upper-level user specifies the control flow by chaining a sequence + * of continuations explicitly through the \link Run() \endlink method, + * while in traditional imperative programming the sequences of + * sentences implicitly specify the control flow. + * + * See http://en.wikipedia.org/wiki/Continuation for more details. + **/ +class Continuation { +public: + typedef std::function<void(const Status &)> Next; + virtual ~Continuation() = default; + virtual void Run(const Next &next) = 0; + Continuation(const Continuation &) = delete; + Continuation &operator=(const Continuation &) = delete; + +protected: + Continuation() = default; +}; + +/** + * A pipeline schedules the execution of a chain of \link Continuation + * \endlink. The pipeline schedules the execution of continuations + * based on their order in the pipeline, where the next parameter for + * each continuation points to the \link Schedule() \endlink + * method. That way the pipeline executes all scheduled continuations + * in sequence. + * + * The typical use case of a pipeline is executing continuations + * asynchronously. Note that a continuation calls the next + * continuation when it is finished. If the continuation is posted + * into an asynchronous event loop, invoking the next continuation + * can be done in the callback handler in the asynchronous event loop. + * + * The pipeline allocates the memory as follows. A pipeline is always + * allocated on the heap. It owns all the continuations as well as the + * the state specified by the user. Both the continuations and the + * state have the same life cycle of the pipeline. The design + * simplifies the problem of ensuring that the executions in the + * asynchronous event loop always hold valid pointers w.r.t. the + * pipeline. The pipeline will automatically deallocate itself right + * after it invokes the callback specified the user. + **/ +template <class State> class Pipeline { +public: + typedef std::function<void(const Status &, const State &)> UserHandler; + static Pipeline *Create() { return new Pipeline(); } + static Pipeline *Create(CancelHandle cancel_handle) { + return new Pipeline(cancel_handle); + } + Pipeline &Push(Continuation *stage); + void Run(UserHandler &&handler); + State &state() { return state_; } + +private: + State state_; + std::vector<std::unique_ptr<Continuation>> routines_; + size_t stage_; + std::function<void(const Status &, const State &)> handler_; + + Pipeline() : stage_(0), cancel_handle_(CancelTracker::New()) {} + Pipeline(CancelHandle cancel_handle) : stage_(0), cancel_handle_(cancel_handle) {} + ~Pipeline() = default; + void Schedule(const Status &status); + CancelHandle cancel_handle_; +}; + +template <class State> +inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) { + routines_.emplace_back(std::unique_ptr<Continuation>(stage)); + return *this; +} + +template <class State> +inline void Pipeline<State>::Schedule(const Status &status) { + // catch cancelation signalled from outside of pipeline + if(cancel_handle_->is_canceled()) { + handler_(Status::Canceled(), state_); + routines_.clear(); + delete this; + } else if (!status.ok() || stage_ >= routines_.size()) { + handler_(status, state_); + routines_.clear(); + delete this; + } else { + auto next = routines_[stage_].get(); + ++stage_; + next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1)); + } +} + +template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) { + handler_ = std::move(handler); + Schedule(Status::OK()); +} + +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h new file mode 100644 index 0000000..21e063e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ +#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ + +#include "common/util.h" + +#include <asio/read.hpp> + +#include <google/protobuf/message_lite.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include <cassert> + +namespace hdfs { +namespace continuation { + +template <class Stream, size_t MaxMessageSize = 512> +struct ReadDelimitedPBMessageContinuation : public Continuation { + ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream, + ::google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + auto handler = [this, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = ToStatus(ec); + } else { + pbio::ArrayInputStream as(&buf_[0], buf_.size()); + pbio::CodedInputStream is(&as); + uint32_t size = 0; + bool v = is.ReadVarint32(&size); + assert(v); + (void)v; //avoids unused variable warning + is.PushLimit(size); + msg_->Clear(); + v = msg_->MergeFromCodedStream(&is); + assert(v); + } + next(status); + }; + asio::async_read(*stream_, + asio::buffer(buf_), + std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } + + size_t offset = 0, len = 0; + for (size_t i = 0; i + 1 < transferred && i < sizeof(int32_t); ++i) { + len = (len << 7) | (buf_[i] & 0x7f); + if ((uint8_t)buf_.at(i) < 0x80) { + offset = i + 1; + break; + } + } + + assert(offset + len < buf_.size() && "Message is too big"); + return offset ? len + offset - transferred : 1; + } + + std::shared_ptr<Stream> stream_; + ::google::protobuf::MessageLite *msg_; + std::array<char, MaxMessageSize> buf_; +}; + +template <class Stream> +struct WriteDelimitedPBMessageContinuation : Continuation { + WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream, + const google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + bool success = true; + buf_ = SerializeDelimitedProtobufMessage(msg_, &success); + + if(!success) { + next(Status::Error("Unable to serialize protobuf message.")); + return; + } + + asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } ); + } + +private: + std::shared_ptr<Stream> stream_; + const google::protobuf::MessageLite *msg_; + std::string buf_; +}; + +template <class Stream, size_t MaxMessageSize = 512> +static inline Continuation * +ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) { + return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, + msg); +} + +template <class Stream> +static inline Continuation * +WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) { + return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg); +} +} +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/fsinfo.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/fsinfo.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/fsinfo.cc new file mode 100644 index 0000000..9f350a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/fsinfo.cc @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hdfspp/fsinfo.h> +#include <sstream> +#include <iomanip> + +namespace hdfs { + +FsInfo::FsInfo() + : capacity(0), + used(0), + remaining(0), + under_replicated(0), + corrupt_blocks(0), + missing_blocks(0), + missing_repl_one_blocks(0), + blocks_in_future(0) { +} + +std::string FsInfo::str(const std::string fs_name) const { + std::string fs_name_label = "Filesystem"; + std::string size = std::to_string(capacity); + std::string size_label = "Size"; + std::string used = std::to_string(this->used); + std::string used_label = "Used"; + std::string available = std::to_string(remaining); + std::string available_label = "Available"; + std::string use_percentage = std::to_string(this->used * 100 / capacity) + "%"; + std::string use_percentage_label = "Use%"; + std::stringstream ss; + ss << std::left << std::setw(std::max(fs_name.size(), fs_name_label.size())) << fs_name_label + << std::right << std::setw(std::max(size.size(), size_label.size()) + 2) << size_label + << std::right << std::setw(std::max(used.size(), used_label.size()) + 2) << used_label + << std::right << std::setw(std::max(available.size(), available_label.size()) + 2) << available_label + << std::right << std::setw(std::max(use_percentage.size(), use_percentage_label.size()) + 2) << use_percentage_label + << std::endl + << std::left << std::setw(std::max(fs_name.size(), fs_name_label.size())) << fs_name + << std::right << std::setw(std::max(size.size(), size_label.size()) + 2) << size + << std::right << std::setw(std::max(used.size(), used_label.size()) + 2) << used + << std::right << std::setw(std::max(available.size(), available_label.size()) + 2) << available + << std::right << std::setw(std::max(use_percentage.size(), use_percentage_label.size()) + 2) << use_percentage; + return ss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc new file mode 100644 index 0000000..07e2edc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/hdfs_configuration.h" +#include "common/logging.h" + +#include <exception> + +#ifndef DEFAULT_SCHEME + #define DEFAULT_SCHEME "hdfs://" +#endif + +namespace hdfs { + +// Constructs a configuration with no search path and no resources loaded +HdfsConfiguration::HdfsConfiguration() : Configuration() {} + +// Constructs a configuration with a copy of the input data +HdfsConfiguration::HdfsConfiguration(ConfigMap &src_map) : Configuration(src_map) {} +HdfsConfiguration::HdfsConfiguration(const ConfigMap &src_map) : Configuration(src_map) {} + +std::vector<std::string> HdfsConfiguration::GetDefaultFilenames() { + auto result = Configuration::GetDefaultFilenames(); + result.push_back("hdfs-site.xml"); + return result; +} + +// Sets a value iff the optional<T> has a value +template <class T, class U> +void OptionalSet(T& target, optional<U> value) { + if (value) + target = *value; +} + +std::vector<std::string> SplitOnComma(const std::string &s, bool include_empty_strings) { + std::vector<std::string> res; + std::string buf; + + for(unsigned int i=0;i<s.size();i++) { + char c = s[i]; + if(c != ',') { + buf += c; + } else { + if(!include_empty_strings && buf.empty()) { + // Skip adding empty strings if needed + continue; + } + res.push_back(buf); + buf.clear(); + } + } + + if(buf.size() > 0) + res.push_back(buf); + + return res; +} + +std::string RemoveSpaces(const std::string &str) { + std::string res; + for(unsigned int i=0; i<str.size(); i++) { + char curr = str[i]; + if(curr != ' ') { + res += curr; + } + } + return res; +} + +// Prepend hdfs:// to string if there isn't already a scheme +// Converts unset optional into empty string +std::string PrependHdfsScheme(optional<std::string> str) { + if(!str) + return ""; + + if(str.value().find("://") == std::string::npos) + return DEFAULT_SCHEME + str.value(); + return str.value(); +} + +// It's either use this, goto, or a lot of returns w/ status checks +struct ha_parse_error : public std::exception { + std::string desc; + ha_parse_error(const std::string &val) : desc(val) {}; + const char *what() const noexcept override { + return desc.c_str(); + }; +}; + +std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string &nameservice) { + LOG_TRACE(kRPC, << "HDFSConfiguration@" << this << "::LookupNameService( nameservice=" << nameservice<< " ) called"); + + std::vector<NamenodeInfo> namenodes; + try { + // Find namenodes that belong to nameservice + std::vector<std::string> namenode_ids; + { + std::string service_nodes = std::string("dfs.ha.namenodes.") + nameservice; + optional<std::string> namenode_list = Get(service_nodes); + if(namenode_list) + namenode_ids = SplitOnComma(namenode_list.value(), false); + else + throw ha_parse_error("unable to find " + service_nodes); + + for(unsigned int i=0; i<namenode_ids.size(); i++) { + namenode_ids[i] = RemoveSpaces(namenode_ids[i]); + LOG_INFO(kRPC, << "Namenode: " << namenode_ids[i]); + } + } + + // should this error if we only find 1 NN? + if(namenode_ids.empty()) + throw ha_parse_error("No namenodes found for nameservice " + nameservice); + + // Get URI for each HA namenode + for(auto node_id=namenode_ids.begin(); node_id != namenode_ids.end(); node_id++) { + // find URI + std::string dom_node_name = std::string("dfs.namenode.rpc-address.") + nameservice + "." + *node_id; + + URI uri; + try { + uri = URI::parse_from_string(PrependHdfsScheme(Get(dom_node_name))); + } catch (const uri_parse_error) { + throw ha_parse_error("unable to find " + dom_node_name); + } + + if(uri.str() == "") { + LOG_WARN(kRPC, << "Attempted to read info for nameservice " << nameservice << " node " << dom_node_name << " but didn't find anything.") + } else { + LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); + } + + NamenodeInfo node(nameservice, *node_id, uri); + namenodes.push_back(node); + } + } catch (ha_parse_error e) { + LOG_ERROR(kRPC, << "HA cluster detected but failed because : " << e.what()); + namenodes.clear(); // Don't return inconsistent view + } + return namenodes; +} + +// Interprets the resources to build an Options object +Options HdfsConfiguration::GetOptions() { + Options result; + + OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey)); + OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey)); + OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey)); + OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); + OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); + OptionalSet(result.block_size, GetInt(kDfsBlockSizeKey)); + + + OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts)); + OptionalSet(result.failover_connection_max_retries, GetInt(kDfsClientFailoverConnectionRetriesOnTimeouts)); + + // Load all nameservices if it's HA configured + optional<std::string> dfs_nameservices = Get("dfs.nameservices"); + if(dfs_nameservices) { + std::string nameservice = dfs_nameservices.value(); + + std::vector<std::string> all_services = SplitOnComma(nameservice, false); + + // Look up nodes for each nameservice so that FileSystem object can support + // multiple nameservices by ID. + for(const std::string &service : all_services) { + if(service.empty()) + continue; + + LOG_DEBUG(kFileSystem, << "Parsing info for nameservice: " << service); + std::vector<NamenodeInfo> nodes = LookupNameService(service); + if(nodes.empty()) { + LOG_WARN(kFileSystem, << "Nameservice \"" << service << "\" declared in config but nodes aren't"); + } else { + result.services[service] = nodes; + } + } + } + + optional<std::string> authentication_value = Get(kHadoopSecurityAuthenticationKey); + + if (authentication_value ) { + std::string fixed_case_value = fixCase(authentication_value.value()); + if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos)) + result.authentication = Options::kKerberos; + else + result.authentication = Options::kSimple; + } + + return result; +} + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h new file mode 100644 index 0000000..d6f902e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_CONFIGURATION_H_ +#define COMMON_HDFS_CONFIGURATION_H_ + +#include "common/configuration.h" +#include "hdfspp/options.h" + +#include <string> +#include <map> +#include <vector> +#include <set> +#include <istream> +#include <stdint.h> + +namespace hdfs { + +class HdfsConfiguration : public Configuration { + public: + // Interprets the resources to build an Options object + Options GetOptions(); + + // Keys to look for in the configuration file + static constexpr const char * kFsDefaultFsKey = "fs.defaultFS"; + static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout"; + static constexpr const char * kIpcClientConnectTimeoutKey = "ipc.client.connect.timeout"; + static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries"; + static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval"; + static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication"; + static constexpr const char * kHadoopSecurityAuthentication_simple = "simple"; + static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos"; + static constexpr const char * kDfsBlockSizeKey = "dfs.blocksize"; + + static constexpr const char * kDfsClientFailoverMaxAttempts = "dfs.client.failover.max.attempts"; + static constexpr const char * kDfsClientFailoverConnectionRetriesOnTimeouts = "dfs.client.failover.connection.retries.on.timeouts"; + + +private: + friend class ConfigurationLoader; + + // Constructs a configuration with no search path and no resources loaded + HdfsConfiguration(); + + // Constructs a configuration with some static data + HdfsConfiguration(ConfigMap &src_map); + HdfsConfiguration(const ConfigMap &src_map); + + static std::vector<std::string> GetDefaultFilenames(); + std::vector<NamenodeInfo> LookupNameService(const std::string &nameservice); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc new file mode 100644 index 0000000..578b782 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_ioservice.h" + +#include <thread> +#include <mutex> +#include <vector> + +#include "common/logging.h" + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { + return new IoServiceImpl(); +} + +std::shared_ptr<IoService> IoService::MakeShared() { + return std::make_shared<IoServiceImpl>(); +} + + +unsigned int IoServiceImpl::InitDefaultWorkers() { + LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called."); + unsigned int logical_thread_count = std::thread::hardware_concurrency(); +#ifndef DISABLE_CONCURRENT_WORKERS + if(logical_thread_count < 1) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread."); + } else { + LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each."); + } +#else + if(logical_thread_count > 0) { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used"); + } + logical_thread_count = 1; +#endif + return InitWorkers(logical_thread_count); +} + +unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) { +#ifdef DISABLED_CONCURRENT_WORKERS + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used"); + thread_count = 1; +#endif + unsigned int created_threads = 0; + for(unsigned int i=0; i<thread_count; i++) { + bool created = AddWorkerThread(); + if(created) { + created_threads++; + } else { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread"); + } + } + if(created_threads != thread_count) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create " + << thread_count << " but only created " << created_threads + << " worker threads. Make sure this process has adequate resources."); + } + return created_threads; +} + +bool IoServiceImpl::AddWorkerThread() { + mutex_guard state_lock(state_lock_); + auto async_worker = [this]() { + this->ThreadStartHook(); + this->Run(); + this->ThreadExitHook(); + }; + worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) ); + return true; +} + + +void IoServiceImpl::ThreadStartHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting"); +} + +void IoServiceImpl::ThreadExitHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting"); +} + +void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) { + io_service_.post(asyncTask); +} + +void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) { + // It is far too easy to destroy the filesystem (and thus the threadpool) + // from within one of the worker threads, leading to a deadlock. Let's + // provide some explicit protection. + if(t->get_id() == std::this_thread::get_id()) { + LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" + << t << ") : FATAL: Attempted to destroy a thread pool" + "from within a callback of the thread pool!"); + } + t->join(); + delete t; +} + +// As long as this just forwards to an asio::io_service method it doesn't need a lock +void IoServiceImpl::Run() { + // The IoService executes callbacks provided by library users in the context of worker threads, + // there is no way of preventing those callbacks from throwing but we can at least prevent them + // from escaping this library and crashing the process. + + // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers + asio::io_service::work work(io_service_); + while(true) + { + try + { + io_service_.run(); + break; + } catch (const std::exception & e) { + LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); + } catch (...) { + LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); + } + } +} + +unsigned int IoServiceImpl::get_worker_thread_count() { + mutex_guard state_lock(state_lock_); + return worker_threads_.size(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h new file mode 100644 index 0000000..294252b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_IOSERVICE_H_ +#define COMMON_HDFS_IOSERVICE_H_ + +#include "hdfspp/hdfspp.h" + +#include <asio/io_service.hpp> +#include "common/util.h" + +#include <mutex> +#include <thread> + +namespace hdfs { + +// Uncomment this to determine if issues are due to concurrency or logic faults +// If tests still fail with concurrency disabled it's most likely a logic bug +#define DISABLE_CONCURRENT_WORKERS + +/* + * A thin wrapper over the asio::io_service with a few extras + * -manages it's own worker threads + * -some helpers for sharing with multiple modules that need to do async work + */ + +class IoServiceImpl : public IoService { + public: + IoServiceImpl() {} + + virtual unsigned int InitDefaultWorkers() override; + virtual unsigned int InitWorkers(unsigned int thread_count) override; + virtual void PostTask(std::function<void(void)>& asyncTask) override; + virtual void Run() override; + virtual void Stop() override { io_service_.stop(); } + + // Add a single worker thread, in the common case try to avoid this in favor + // of Init[Default]Workers. Public for use by tests and rare cases where a + // client wants very explicit control of threading for performance reasons + // e.g. pinning threads to NUMA nodes. + bool AddWorkerThread(); + + // Be very careful about using this: HDFS-10241 + ::asio::io_service &io_service() { return io_service_; } + unsigned int get_worker_thread_count(); + private: + std::mutex state_lock_; + ::asio::io_service io_service_; + + // For doing logging + resource manager updates on thread start/exit + void ThreadStartHook(); + void ThreadExitHook(); + + // Support for async worker threads + struct WorkerDeleter { + void operator()(std::thread *t); + }; + typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; + std::vector<WorkerPtr> worker_threads_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc new file mode 100644 index 0000000..d9f4edf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libhdfs_events_impl.h" + +#include <exception> + +namespace hdfs { + +/** + * Default no-op callback implementations + **/ + +LibhdfsEvents::LibhdfsEvents() : fs_callback(std::experimental::nullopt), + file_callback(std::experimental::nullopt) +{} + +LibhdfsEvents::~LibhdfsEvents() {} + +void LibhdfsEvents::set_fs_callback(const fs_event_callback & callback) { + fs_callback = callback; +} + +void LibhdfsEvents::set_file_callback(const file_event_callback & callback) { + file_callback = callback; +} + +void LibhdfsEvents::clear_fs_callback() { + fs_callback = std::experimental::nullopt; +} + +void LibhdfsEvents::clear_file_callback() { + file_callback = std::experimental::nullopt; +} + +event_response LibhdfsEvents::call(const char * event, + const char * cluster, + int64_t value) +{ + if (fs_callback) { + try { + return fs_callback->operator()(event, cluster, value); + } catch (const std::exception& e) { + return event_response::make_caught_std_exception(e.what()); + } catch (...) { + // Arguably calling abort() here would serve as appropriate + // punishment for those who throw garbage that isn't derived + // from std::exception... + return event_response::make_caught_unknown_exception(); + } + } else { + return event_response::make_ok(); + } +} + +event_response LibhdfsEvents::call(const char * event, + const char * cluster, + const char * file, + int64_t value) +{ + if (file_callback) { + try { + return file_callback->operator()(event, cluster, file, value); + } catch (const std::exception& e) { + return event_response::make_caught_std_exception(e.what()); + } catch (...) { + return event_response::make_caught_unknown_exception(); + } + } else { + return event_response::make_ok(); + } +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h new file mode 100644 index 0000000..e43266a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL +#define LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL + +#include "hdfspp/events.h" +#include "common/optional_wrapper.h" + +#include <functional> + +namespace hdfs { + +/** + * Users can specify event handlers. Default is a no-op handler. + **/ +class LibhdfsEvents { +public: + LibhdfsEvents(); + virtual ~LibhdfsEvents(); + + void set_fs_callback(const fs_event_callback & callback); + void set_file_callback(const file_event_callback & callback); + void clear_fs_callback(); + void clear_file_callback(); + + event_response call(const char *event, + const char *cluster, + int64_t value); + + event_response call(const char *event, + const char *cluster, + const char *file, + int64_t value); +private: + // Called when fs events occur + std::experimental::optional<fs_event_callback> fs_callback; + + // Called when file events occur + std::experimental::optional<file_event_callback> file_callback; +}; + +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc new file mode 100644 index 0000000..30dcb44 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/locks.h" + +#include <mutex> + + +namespace hdfs { + +LockGuard::LockGuard(Mutex *m) : _mtx(m) { + if(!m) { + throw LockFailure("LockGuard passed invalid (null) Mutex pointer"); + } + _mtx->lock(); +} + +LockGuard::~LockGuard() { + if(_mtx) { + _mtx->unlock(); + } +} + + +// Basic mutexes to use as default. Just a wrapper around C++11 std::mutex. +class DefaultMutex : public Mutex { + public: + DefaultMutex() {} + + void lock() override { + // Could throw in here if the implementation couldn't lock for some reason. + _mtx.lock(); + } + + void unlock() override { + _mtx.unlock(); + } + + std::string str() override { + return "DefaultMutex"; + } + private: + std::mutex _mtx; +}; + +DefaultMutex defaultTestMutex; +DefaultMutex defaultGssapiMutex; + +// LockManager static var instantiation +Mutex *LockManager::TEST_default_mutex = &defaultTestMutex; +Mutex *LockManager::gssapiMtx = &defaultGssapiMutex; +std::mutex LockManager::_state_lock; +bool LockManager::_finalized = false; + +bool LockManager::InitLocks(Mutex *gssapi) { + std::lock_guard<std::mutex> guard(_state_lock); + + // You get once shot to set this - swapping the locks + // out while in use gets risky. It can still be done by + // using the Mutex as a proxy object if one understands + // the implied risk of doing so. + if(_finalized) + return false; + + gssapiMtx = gssapi; + _finalized = true; + return true; +} + +Mutex *LockManager::getGssapiMutex() { + std::lock_guard<std::mutex> guard(_state_lock); + return gssapiMtx; +} + +Mutex *LockManager::TEST_get_default_mutex() { + return TEST_default_mutex; +} + +void LockManager::TEST_reset_manager() { + _finalized = false; + // user still responsible for cleanup + gssapiMtx = &defaultGssapiMutex; +} + +} // end namepace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc new file mode 100644 index 0000000..94bce83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "logging.h" + +#include <ctime> +#include <cstring> +#include <thread> +#include <iostream> +#include <sstream> + +namespace hdfs +{ + +LogManager::LogManager() {} +std::unique_ptr<LoggerInterface> LogManager::logger_impl_(new StderrLogger()); +std::mutex LogManager::impl_lock_; +uint32_t LogManager::component_mask_ = 0xFFFFFFFF; +uint32_t LogManager::level_threshold_ = kWarning; + +void LogManager::DisableLogForComponent(LogSourceComponent c) { + // AND with all bits other than one we want to unset + std::lock_guard<std::mutex> impl_lock(impl_lock_); + component_mask_ &= ~c; +} + +void LogManager::EnableLogForComponent(LogSourceComponent c) { + // OR with bit to set + std::lock_guard<std::mutex> impl_lock(impl_lock_); + component_mask_ |= c; +} + +void LogManager::SetLogLevel(LogLevel level) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + level_threshold_ = level; +} + +void LogManager::Write(const LogMessage& msg) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + if(logger_impl_) + logger_impl_->Write(msg); +} + +void LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + logger_impl_.reset(impl.release()); +} + + +/** + * Simple plugin to dump logs to stderr + **/ +void StderrLogger::Write(const LogMessage& msg) { + std::stringstream formatted; + + if(show_level_) + formatted << msg.level_string(); + + if(show_component_) + formatted << msg.component_string(); + + if(show_timestamp_) { + time_t current_time = std::time(nullptr); + char timestr[128]; + memset(timestr, 0, 128); + int res = std::strftime(timestr, 128, "%a %b %e %H:%M:%S %Y", std::localtime(¤t_time)); + if(res > 0) { + formatted << '[' << (const char*)timestr << ']'; + } else { + formatted << "[Error formatting timestamp]"; + } + } + + if(show_component_) { + formatted << "[Thread id = " << std::this_thread::get_id() << ']'; + } + + if(show_file_) { + // __FILE__ contains absolute path, which is giant if doing a build inside the + // Hadoop tree. Trim down to relative to libhdfspp/ + std::string abs_path(msg.file_name()); + size_t rel_path_idx = abs_path.find("libhdfspp/"); + // Default to whole string if library is being built in an odd way + if(rel_path_idx == std::string::npos) + rel_path_idx = 0; + + formatted << '[' << (const char*)&abs_path[rel_path_idx] << ":" << msg.file_line() << ']'; + } + + std::cerr << formatted.str() << " " << msg.MsgString() << std::endl; +} + +void StderrLogger::set_show_timestamp(bool show) { + show_timestamp_ = show; +} +void StderrLogger::set_show_level(bool show) { + show_level_ = show; +} +void StderrLogger::set_show_thread(bool show) { + show_thread_ = show; +} +void StderrLogger::set_show_component(bool show) { + show_component_ = show; +} + + +LogMessage::~LogMessage() { + LogManager::Write(*this); +} + +LogMessage& LogMessage::operator<<(const std::string *str) { + if(str) + msg_buffer_ << str; + else + msg_buffer_ << "<nullptr>"; + return *this; +} + +LogMessage& LogMessage::operator<<(const std::string& str) { + msg_buffer_ << str; + return *this; +} + +LogMessage& LogMessage::operator<<(const ::asio::ip::tcp::endpoint& endpoint) { + msg_buffer_ << endpoint; + return *this; +} + +LogMessage& LogMessage::operator<<(const char *str) { + if(str) + msg_buffer_ << str; + else + msg_buffer_ << "<nullptr>"; + return *this; +} + +LogMessage& LogMessage::operator<<(bool val) { + if(val) + msg_buffer_ << "true"; + else + msg_buffer_ << "false"; + return *this; +} + +LogMessage& LogMessage::operator<<(int32_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(uint32_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(int64_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(uint64_t val) { + msg_buffer_ << val; + return *this; +} + +LogMessage& LogMessage::operator<<(void *ptr) { + msg_buffer_ << ptr; + return *this; +} + + +LogMessage& LogMessage::operator<<(const std::thread::id& tid) { + msg_buffer_ << tid; + return *this; +} + +std::string LogMessage::MsgString() const { + return msg_buffer_.str(); +} + +const char * kLevelStrings[5] = { + "[TRACE ]", + "[DEBUG ]", + "[INFO ]", + "[WARN ]", + "[ERROR ]" +}; + +const char * LogMessage::level_string() const { + return kLevelStrings[level_]; +} + +const char * kComponentStrings[6] = { + "[Unknown ]", + "[RPC ]", + "[BlockReader ]", + "[FileHandle ]", + "[FileSystem ]", + "[Async Runtime ]", +}; + +const char * LogMessage::component_string() const { + switch(component_) { + case kRPC: return kComponentStrings[1]; + case kBlockReader: return kComponentStrings[2]; + case kFileHandle: return kComponentStrings[3]; + case kFileSystem: return kComponentStrings[4]; + case kAsyncRuntime: return kComponentStrings[5]; + default: return kComponentStrings[0]; + } +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h new file mode 100644 index 0000000..69f9c6e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_COMMON_LOGGING_H_ +#define LIB_COMMON_LOGGING_H_ + +#include <asio/ip/tcp.hpp> + +#include "hdfspp/log.h" + +#include <iostream> +#include <sstream> +#include <mutex> +#include <memory> +#include <thread> + +#include <asio/ip/tcp.hpp> + +namespace hdfs { + +/** + * Logging mechanism to provide lightweight logging to stderr as well as + * as a callback mechanism to allow C clients and larger third party libs + * to be used to handle logging. When adding a new log message to the + * library use the macros defined below (LOG_TRACE..LOG_ERROR) rather than + * using the LogMessage and LogManager objects directly. + **/ + +enum LogLevel { + kTrace = 0, + kDebug = 1, + kInfo = 2, + kWarning = 3, + kError = 4, +}; + +enum LogSourceComponent { + kUnknown = 1 << 0, + kRPC = 1 << 1, + kBlockReader = 1 << 2, + kFileHandle = 1 << 3, + kFileSystem = 1 << 4, + kAsyncRuntime = 1 << 5, +}; + +#define LOG_TRACE(C, MSG) do { \ +if(LogManager::ShouldLog(kTrace,C)) { \ + LogMessage(kTrace, __FILE__, __LINE__, C) MSG; \ +}} while (0); + + +#define LOG_DEBUG(C, MSG) do { \ +if(LogManager::ShouldLog(kDebug,C)) { \ + LogMessage(kDebug, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_INFO(C, MSG) do { \ +if(LogManager::ShouldLog(kInfo,C)) { \ + LogMessage(kInfo, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_WARN(C, MSG) do { \ +if(LogManager::ShouldLog(kWarning,C)) { \ + LogMessage(kWarning, __FILE__, __LINE__, C) MSG; \ +}} while (0); + +#define LOG_ERROR(C, MSG) do { \ +if(LogManager::ShouldLog(kError,C)) { \ + LogMessage(kError, __FILE__, __LINE__, C) MSG; \ +}} while (0); + + +class LogMessage; + +class LoggerInterface { + public: + LoggerInterface() {}; + virtual ~LoggerInterface() {}; + + /** + * User defined handling messages, common case would be printing somewhere. + **/ + virtual void Write(const LogMessage& msg) = 0; +}; + +/** + * StderrLogger unsuprisingly dumps messages to stderr. + * This is the default logger if nothing else is explicitly set. + **/ +class StderrLogger : public LoggerInterface { + public: + StderrLogger() : show_timestamp_(true), show_level_(true), + show_thread_(true), show_component_(true), + show_file_(true) {} + void Write(const LogMessage& msg); + void set_show_timestamp(bool show); + void set_show_level(bool show); + void set_show_thread(bool show); + void set_show_component(bool show); + private: + bool show_timestamp_; + bool show_level_; + bool show_thread_; + bool show_component_; + bool show_file_; +}; + + +/** + * LogManager provides a thread safe static interface to the underlying + * logger implementation. + **/ +class LogManager { + friend class LogMessage; + public: + // allow easy inlining + static bool ShouldLog(LogLevel level, LogSourceComponent source) { + std::lock_guard<std::mutex> impl_lock(impl_lock_); + if(level < level_threshold_) + return false; + if(!(source & component_mask_)) + return false; + return true; + } + static void Write(const LogMessage & msg); + static void EnableLogForComponent(LogSourceComponent c); + static void DisableLogForComponent(LogSourceComponent c); + static void SetLogLevel(LogLevel level); + static void SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl); + + private: + // don't create instances of this + LogManager(); + // synchronize all unsafe plugin calls + static std::mutex impl_lock_; + static std::unique_ptr<LoggerInterface> logger_impl_; + // component and level masking + static uint32_t component_mask_; + static uint32_t level_threshold_; +}; + +/** + * LogMessage contains message text, along with other metadata about the message. + * Note: For performance reasons a set of macros (see top of file) is used to + * create these inside of an if block. Do not instantiate these directly, doing + * so will cause the message to be uncontitionally logged. This minor inconvinience + * gives us a ~20% performance increase in the (common) case where few messages + * are worth logging; std::stringstream is expensive to construct. + **/ +class LogMessage { + friend class LogManager; + public: + LogMessage(const LogLevel &l, const char *file, int line, + LogSourceComponent component = kUnknown) : + level_(l), component_(component), origin_file_(file), origin_line_(line){} + + ~LogMessage(); + + const char *level_string() const; + const char *component_string() const; + LogLevel level() const {return level_; } + LogSourceComponent component() const {return component_; } + int file_line() const {return origin_line_; } + const char * file_name() const {return origin_file_; } + + //print as-is, indicates when a nullptr was passed in + LogMessage& operator<<(const char *); + LogMessage& operator<<(const std::string*); + LogMessage& operator<<(const std::string&); + + //convert to a string "true"/"false" + LogMessage& operator<<(bool); + + //integral types + LogMessage& operator<<(int32_t); + LogMessage& operator<<(uint32_t); + LogMessage& operator<<(int64_t); + LogMessage& operator<<(uint64_t); + + //print address as hex + LogMessage& operator<<(void *); + + //asio types + LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint); + + //thread and mutex types + LogMessage& operator<<(const std::thread::id& tid); + + + std::string MsgString() const; + + private: + LogLevel level_; + LogSourceComponent component_; + const char *origin_file_; + const int origin_line_; + std::stringstream msg_buffer_; +}; + +} + +#endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org