Repository: hadoop Updated Branches: refs/heads/HDFS-8707 e658f07ad -> e7f4e6679
HDFS-10796: libhdfs++: rationalize ioservice interactions. Contributed by James Clampffer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7f4e667 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7f4e667 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7f4e667 Branch: refs/heads/HDFS-8707 Commit: e7f4e6679a9e7502cf314409c2daaeac4e5c54ef Parents: e658f07 Author: James <j...@apache.org> Authored: Fri Dec 9 18:06:06 2016 -0500 Committer: James <j...@apache.org> Committed: Fri Dec 9 18:06:06 2016 -0500 ---------------------------------------------------------------------- .../native/libhdfspp/include/hdfspp/hdfspp.h | 42 ++++++-- .../native/libhdfspp/include/hdfspp/options.h | 7 ++ .../libhdfspp/lib/common/hdfs_ioservice.cc | 99 ++++++++++++++++- .../libhdfspp/lib/common/hdfs_ioservice.h | 41 +++++++- .../main/native/libhdfspp/lib/common/logging.cc | 19 ++-- .../main/native/libhdfspp/lib/common/logging.h | 15 ++- .../main/native/libhdfspp/lib/common/options.cc | 6 +- .../main/native/libhdfspp/lib/fs/filesystem.cc | 105 ++++++++++++------- .../main/native/libhdfspp/lib/fs/filesystem.h | 19 ++-- .../main/native/libhdfspp/tests/CMakeLists.txt | 5 + .../native/libhdfspp/tests/hdfspp_mini_dfs.h | 8 +- 11 files changed, 292 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 78ad594..5fbd3d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -39,24 +39,47 @@ namespace hdfs { * When an operation is queued into an IoService, the IoService will * run the callback handler associated with the operation. Note that * the IoService must be stopped before destructing the objects that - * file the operations. + * post the operations. * - * From an implementation point of view the IoService object wraps the - * ::asio::io_service objects. Please see the related documentation - * for more details. + * From an implementation point of view the hdfs::IoService provides + * a thin wrapper over an asio::io_service object so that additional + * instrumentation and functionality can be added. **/ -class IoService { + +class IoService : public std::enable_shared_from_this<IoService> +{ public: static IoService *New(); + static std::shared_ptr<IoService> MakeShared(); + virtual ~IoService(); + + /** + * Start up as many threads as there are logical processors. + * Return number of threads created. + **/ + virtual unsigned int InitDefaultWorkers() = 0; + + /** + * Initialize with thread_count handler threads. + * If thread count is less than one print a log message and default to one thread. + * Return number of threads created. + **/ + virtual unsigned int InitWorkers(unsigned int thread_count) = 0; + + /** + * Place an item on the execution queue. Will be invoked from outside of the calling context. + **/ + virtual void PostTask(std::function<void(void)>& asyncTask) = 0; + /** * Run the asynchronous tasks associated with this IoService. **/ virtual void Run() = 0; /** * Stop running asynchronous tasks associated with this IoService. + * All worker threads will return as soon as they finish executing their current task. **/ virtual void Stop() = 0; - virtual ~IoService(); }; /** @@ -164,6 +187,13 @@ class FileSystem { IoService *&io_service, const std::string &user_name, const Options &options); /** + * Works the same as the other FileSystem::New but takes a copy of an existing IoService. + * The shared IoService is expected to already have worker threads initialized. + **/ + static FileSystem *New( + std::shared_ptr<IoService>, const std::string &user_name, const Options &options); + + /** * Returns a new instance with default user and option, with the default IOService. **/ static FileSystem *New(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h index b73a729..456390f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h @@ -123,6 +123,13 @@ struct Options { long block_size; static const long kDefaultBlockSize = 128*1024*1024; + /** + * Asio worker thread count + * default: -1, indicates number of hardware threads + **/ + int io_threads_; + static const int kDefaultIoThreads = -1; + Options(); }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc index f6876cd..578b782 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc @@ -18,14 +18,105 @@ #include "hdfs_ioservice.h" +#include <thread> +#include <mutex> +#include <vector> + #include "common/logging.h" namespace hdfs { IoService::~IoService() {} -IoService *IoService::New() { return new IoServiceImpl(); } +IoService *IoService::New() { + return new IoServiceImpl(); +} + +std::shared_ptr<IoService> IoService::MakeShared() { + return std::make_shared<IoServiceImpl>(); +} + + +unsigned int IoServiceImpl::InitDefaultWorkers() { + LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called."); + unsigned int logical_thread_count = std::thread::hardware_concurrency(); +#ifndef DISABLE_CONCURRENT_WORKERS + if(logical_thread_count < 1) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread."); + } else { + LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each."); + } +#else + if(logical_thread_count > 0) { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used"); + } + logical_thread_count = 1; +#endif + return InitWorkers(logical_thread_count); +} + +unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) { +#ifdef DISABLED_CONCURRENT_WORKERS + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used"); + thread_count = 1; +#endif + unsigned int created_threads = 0; + for(unsigned int i=0; i<thread_count; i++) { + bool created = AddWorkerThread(); + if(created) { + created_threads++; + } else { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread"); + } + } + if(created_threads != thread_count) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create " + << thread_count << " but only created " << created_threads + << " worker threads. Make sure this process has adequate resources."); + } + return created_threads; +} +bool IoServiceImpl::AddWorkerThread() { + mutex_guard state_lock(state_lock_); + auto async_worker = [this]() { + this->ThreadStartHook(); + this->Run(); + this->ThreadExitHook(); + }; + worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) ); + return true; +} + + +void IoServiceImpl::ThreadStartHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting"); +} + +void IoServiceImpl::ThreadExitHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting"); +} + +void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) { + io_service_.post(asyncTask); +} + +void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) { + // It is far too easy to destroy the filesystem (and thus the threadpool) + // from within one of the worker threads, leading to a deadlock. Let's + // provide some explicit protection. + if(t->get_id() == std::this_thread::get_id()) { + LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" + << t << ") : FATAL: Attempted to destroy a thread pool" + "from within a callback of the thread pool!"); + } + t->join(); + delete t; +} + +// As long as this just forwards to an asio::io_service method it doesn't need a lock void IoServiceImpl::Run() { // The IoService executes callbacks provided by library users in the context of worker threads, // there is no way of preventing those callbacks from throwing but we can at least prevent them @@ -33,7 +124,7 @@ void IoServiceImpl::Run() { // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers asio::io_service::work work(io_service_); - for(;;) + while(true) { try { @@ -47,5 +138,9 @@ void IoServiceImpl::Run() { } } +unsigned int IoServiceImpl::get_worker_thread_count() { + mutex_guard state_lock(state_lock_); + return worker_threads_.size(); +} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h index 73d167e..294252b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h @@ -22,23 +22,56 @@ #include "hdfspp/hdfspp.h" #include <asio/io_service.hpp> +#include "common/util.h" + +#include <mutex> +#include <thread> namespace hdfs { +// Uncomment this to determine if issues are due to concurrency or logic faults +// If tests still fail with concurrency disabled it's most likely a logic bug +#define DISABLE_CONCURRENT_WORKERS + /* - * A thin wrapper over the asio::io_service. - * -In the future this could own the worker threads that execute io tasks which - * makes it easier to share IoServices between FileSystems. See HDFS-10796 for - * rationale. + * A thin wrapper over the asio::io_service with a few extras + * -manages it's own worker threads + * -some helpers for sharing with multiple modules that need to do async work */ class IoServiceImpl : public IoService { public: + IoServiceImpl() {} + + virtual unsigned int InitDefaultWorkers() override; + virtual unsigned int InitWorkers(unsigned int thread_count) override; + virtual void PostTask(std::function<void(void)>& asyncTask) override; virtual void Run() override; virtual void Stop() override { io_service_.stop(); } + + // Add a single worker thread, in the common case try to avoid this in favor + // of Init[Default]Workers. Public for use by tests and rare cases where a + // client wants very explicit control of threading for performance reasons + // e.g. pinning threads to NUMA nodes. + bool AddWorkerThread(); + + // Be very careful about using this: HDFS-10241 ::asio::io_service &io_service() { return io_service_; } + unsigned int get_worker_thread_count(); private: + std::mutex state_lock_; ::asio::io_service io_service_; + + // For doing logging + resource manager updates on thread start/exit + void ThreadStartHook(); + void ThreadExitHook(); + + // Support for async worker threads + struct WorkerDeleter { + void operator()(std::thread *t); + }; + typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; + std::vector<WorkerPtr> worker_threads_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc index ac1c336..94bce83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc @@ -183,6 +183,11 @@ LogMessage& LogMessage::operator<<(void *ptr) { } +LogMessage& LogMessage::operator<<(const std::thread::id& tid) { + msg_buffer_ << tid; + return *this; +} + std::string LogMessage::MsgString() const { return msg_buffer_.str(); } @@ -199,12 +204,13 @@ const char * LogMessage::level_string() const { return kLevelStrings[level_]; } -const char * kComponentStrings[5] = { - "[Unknown ]", - "[RPC ]", - "[BlockReader ]", - "[FileHandle ]", - "[FileSystem ]" +const char * kComponentStrings[6] = { + "[Unknown ]", + "[RPC ]", + "[BlockReader ]", + "[FileHandle ]", + "[FileSystem ]", + "[Async Runtime ]", }; const char * LogMessage::component_string() const { @@ -213,6 +219,7 @@ const char * LogMessage::component_string() const { case kBlockReader: return kComponentStrings[2]; case kFileHandle: return kComponentStrings[3]; case kFileSystem: return kComponentStrings[4]; + case kAsyncRuntime: return kComponentStrings[5]; default: return kComponentStrings[0]; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h index f807fc4..69f9c6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -27,6 +27,7 @@ #include <sstream> #include <mutex> #include <memory> +#include <thread> #include <asio/ip/tcp.hpp> @@ -49,11 +50,12 @@ enum LogLevel { }; enum LogSourceComponent { - kUnknown = 1 << 0, - kRPC = 1 << 1, - kBlockReader = 1 << 2, - kFileHandle = 1 << 3, - kFileSystem = 1 << 4, + kUnknown = 1 << 0, + kRPC = 1 << 1, + kBlockReader = 1 << 2, + kFileHandle = 1 << 3, + kFileSystem = 1 << 4, + kAsyncRuntime = 1 << 5, }; #define LOG_TRACE(C, MSG) do { \ @@ -196,6 +198,9 @@ class LogMessage { //asio types LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint); + //thread and mutex types + LogMessage& operator<<(const std::thread::id& tid); + std::string MsgString() const; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc index a889be5..728082e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -39,7 +39,11 @@ Options::Options() : rpc_timeout(kDefaultRpcTimeout), failover_max_retries(kDefaultFailoverMaxRetries), failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries), authentication(kDefaultAuthentication), - block_size(kDefaultBlockSize) {} + block_size(kDefaultBlockSize), + io_threads_(kDefaultIoThreads) +{ + +} std::string NamenodeInfo::get_host() const { return uri.get_host(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index d805716..a5f3aad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -79,11 +79,20 @@ FileSystem *FileSystem::New( return new FileSystemImpl(io_service, user_name, options); } +FileSystem *FileSystem::New( + std::shared_ptr<IoService> io_service, const std::string &user_name, const Options &options) { + return new FileSystemImpl(io_service, user_name, options); +} + FileSystem *FileSystem::New() { // No, this pointer won't be leaked. The FileSystem takes ownership. - IoService *io_service = IoService::New(); + std::shared_ptr<IoService> io_service = IoService::MakeShared(); if(!io_service) return nullptr; + int thread_count = io_service->InitDefaultWorkers(); + if(thread_count < 1) + return nullptr; + std::string user_name = get_effective_user_name(""); Options options; return new FileSystemImpl(io_service, user_name, options); @@ -123,25 +132,56 @@ const std::string get_effective_user_name(const std::string &user_name) { } FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) : - options_(options), client_name_(GetRandomClientName()), io_service_( - static_cast<IoServiceImpl *>(io_service)), - nn_( - &io_service_->io_service(), options, client_name_, - get_effective_user_name(user_name), kNamenodeProtocol, - kNamenodeProtocolVersion - ), bad_node_tracker_(std::make_shared<BadDataNodeTracker>()), - event_handlers_(std::make_shared<LibhdfsEvents>()) { - - LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl(" + io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options), + client_name_(GetRandomClientName()), + nn_( + &io_service_->io_service(), options, client_name_, + get_effective_user_name(user_name), kNamenodeProtocol, + kNamenodeProtocolVersion + ), + bad_node_tracker_(std::make_shared<BadDataNodeTracker>()), + event_handlers_(std::make_shared<LibhdfsEvents>()) +{ + + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" << FMT_THIS_ADDR << ") called"); // Poor man's move io_service = nullptr; - /* spawn background threads for asio delegation */ - unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */; - for (unsigned int i = 0; i < threads; i++) { - AddWorkerThread(); + unsigned int running_workers = 0; + if(options.io_threads_ < 1) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads"); + running_workers = io_service_->InitDefaultWorkers(); + } else { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads."); + running_workers = io_service->InitWorkers(options_.io_threads_); + } + + if(running_workers < 1) { + LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads"); + } +} + +FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) : + io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), options_(options), + client_name_(GetRandomClientName()), + nn_( + &io_service_->io_service(), options, client_name_, + get_effective_user_name(user_name), kNamenodeProtocol, + kNamenodeProtocolVersion + ), + bad_node_tracker_(std::make_shared<BadDataNodeTracker>()), + event_handlers_(std::make_shared<LibhdfsEvents>()) +{ + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" + << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called"); + int worker_thread_count = io_service_->get_worker_thread_count(); + if(worker_thread_count < 1) { + LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. " + << "It needs at least 1 worker to connect to an HDFS cluster.") + } else { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads."); } } @@ -154,7 +194,6 @@ FileSystemImpl::~FileSystemImpl() { * Once worker threads are joined and deleted the service can be deleted. **/ io_service_->Stop(); - worker_threads_.clear(); } void FileSystemImpl::Connect(const std::string &server, @@ -230,12 +269,21 @@ void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, int FileSystemImpl::AddWorkerThread() { LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread(" << FMT_THIS_ADDR << ") called." - << " Existing thread count = " << worker_threads_.size()); + << " Existing thread count = " << WorkerThreadCount()); + + if(!io_service_) + return -1; - auto service_task = [](IoService *service) { service->Run(); }; - worker_threads_.push_back( - WorkerPtr(new std::thread(service_task, io_service_.get()))); - return worker_threads_.size(); + io_service_->AddWorkerThread(); + return 1; +} + +int FileSystemImpl::WorkerThreadCount() { + if(!io_service_) { + return -1; + } else { + return io_service_->get_worker_thread_count(); + } } void FileSystemImpl::Open( @@ -714,21 +762,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path, nn_.DisallowSnapshot(path, handler); } - -void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { - // It is far too easy to destroy the filesystem (and thus the threadpool) - // from within one of the worker threads, leading to a deadlock. Let's - // provide some explicit protection. - if(t->get_id() == std::this_thread::get_id()) { - LOG_ERROR(kFileSystem, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" - << t << ") : FATAL: Attempted to destroy a thread pool" - "from within a callback of the thread pool!"); - } - t->join(); - delete t; -} - - void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) { if (event_handlers_) { event_handlers_->set_fs_callback(callback); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 0e9cedd..80978cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -46,7 +46,8 @@ namespace hdfs { class FileSystemImpl : public FileSystem { public: MEMCHECKED_CLASS(FileSystemImpl) - FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); + explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); + explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); ~FileSystemImpl() override; /* attempt to connect to namenode, return bad status on failure */ @@ -176,7 +177,7 @@ public: int AddWorkerThread(); /* how many worker threads are servicing asio requests */ - int WorkerThreadCount() { return worker_threads_.size(); } + int WorkerThreadCount(); /* all monitored events will need to lookup handlers */ std::shared_ptr<LibhdfsEvents> get_event_handlers(); @@ -184,24 +185,18 @@ public: Options get_options(); private: - const Options options_; - const std::string client_name_; - std::string cluster_name_; /** * The IoService must be the first member variable to ensure that it gets * destroyed last. This allows other members to dequeue things from the * service in their own destructors. **/ - std::unique_ptr<IoServiceImpl> io_service_; + std::shared_ptr<IoServiceImpl> io_service_; + const Options options_; + const std::string client_name_; + std::string cluster_name_; NameNodeOperations nn_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; - struct WorkerDeleter { - void operator()(std::thread *t); - }; - typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; - std::vector<WorkerPtr> worker_threads_; - /** * Runtime event monitoring handlers. * Note: This is really handy to have for advanced usage but http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 3dd4aae..395fad5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -112,6 +112,11 @@ add_executable(logging_test logging_test.cc) target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(logging_test logging_test) +add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc) +target_link_libraries(hdfs_ioservice_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +add_memcheck_test(hdfs_ioservice hdfs_ioservice_test) + + # # # INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h index 3ec58e1..aecced1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h @@ -113,7 +113,7 @@ private: class MiniCluster { public: - MiniCluster() : io_service(IoService::New()) { + MiniCluster() : io_service(IoService::MakeShared()) { struct NativeMiniDfsConf conf = { 1, /* doFormat */ 0, /* webhdfs */ @@ -137,6 +137,10 @@ public: // Connect via the C++ API FSHandle connect(const std::string username) { Options options; + + unsigned int worker_count = io_service->InitDefaultWorkers(); + EXPECT_NE(0, worker_count); + FileSystem * fs = FileSystem::New(io_service, username, options); EXPECT_NE(nullptr, fs); FSHandle result(fs); @@ -184,7 +188,7 @@ public: protected: struct NativeMiniDfsCluster* clusterInfo; - IoService * io_service; + std::shared_ptr<IoService> io_service; }; } // namespace --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org