Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 e658f07ad -> e7f4e6679


HDFS-10796: libhdfs++: rationalize ioservice interactions. Contributed by James 
Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7f4e667
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7f4e667
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7f4e667

Branch: refs/heads/HDFS-8707
Commit: e7f4e6679a9e7502cf314409c2daaeac4e5c54ef
Parents: e658f07
Author: James <j...@apache.org>
Authored: Fri Dec 9 18:06:06 2016 -0500
Committer: James <j...@apache.org>
Committed: Fri Dec 9 18:06:06 2016 -0500

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |  42 ++++++--
 .../native/libhdfspp/include/hdfspp/options.h   |   7 ++
 .../libhdfspp/lib/common/hdfs_ioservice.cc      |  99 ++++++++++++++++-
 .../libhdfspp/lib/common/hdfs_ioservice.h       |  41 +++++++-
 .../main/native/libhdfspp/lib/common/logging.cc |  19 ++--
 .../main/native/libhdfspp/lib/common/logging.h  |  15 ++-
 .../main/native/libhdfspp/lib/common/options.cc |   6 +-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  | 105 ++++++++++++-------
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  19 ++--
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   5 +
 .../native/libhdfspp/tests/hdfspp_mini_dfs.h    |   8 +-
 11 files changed, 292 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 78ad594..5fbd3d8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -39,24 +39,47 @@ namespace hdfs {
  * When an operation is queued into an IoService, the IoService will
  * run the callback handler associated with the operation. Note that
  * the IoService must be stopped before destructing the objects that
- * file the operations.
+ * post the operations.
  *
- * From an implementation point of view the IoService object wraps the
- * ::asio::io_service objects. Please see the related documentation
- * for more details.
+ * From an implementation point of view the hdfs::IoService provides
+ * a thin wrapper over an asio::io_service object so that additional
+ * instrumentation and functionality can be added.
  **/
-class IoService {
+
+class IoService : public std::enable_shared_from_this<IoService>
+{
  public:
   static IoService *New();
+  static std::shared_ptr<IoService> MakeShared();
+  virtual ~IoService();
+
+  /**
+   * Start up as many threads as there are logical processors.
+   * Return number of threads created.
+   **/
+  virtual unsigned int InitDefaultWorkers() = 0;
+
+  /**
+   * Initialize with thread_count handler threads.
+   * If thread count is less than one print a log message and default to one 
thread.
+   * Return number of threads created.
+   **/
+  virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
+
+  /**
+   * Place an item on the execution queue.  Will be invoked from outside of 
the calling context.
+   **/
+  virtual void PostTask(std::function<void(void)>& asyncTask) = 0;
+
   /**
    * Run the asynchronous tasks associated with this IoService.
    **/
   virtual void Run() = 0;
   /**
    * Stop running asynchronous tasks associated with this IoService.
+   * All worker threads will return as soon as they finish executing their 
current task.
    **/
   virtual void Stop() = 0;
-  virtual ~IoService();
 };
 
 /**
@@ -164,6 +187,13 @@ class FileSystem {
       IoService *&io_service, const std::string &user_name, const Options 
&options);
 
   /**
+   * Works the same as the other FileSystem::New but takes a copy of an 
existing IoService.
+   * The shared IoService is expected to already have worker threads 
initialized.
+   **/
+  static FileSystem *New(
+      std::shared_ptr<IoService>, const std::string &user_name, const Options 
&options);
+
+  /**
    * Returns a new instance with default user and option, with the default 
IOService.
    **/
   static FileSystem *New();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
index b73a729..456390f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
@@ -123,6 +123,13 @@ struct Options {
   long block_size;
   static const long kDefaultBlockSize = 128*1024*1024;
 
+  /**
+   * Asio worker thread count
+   * default: -1, indicates number of hardware threads
+   **/
+  int io_threads_;
+  static const int kDefaultIoThreads = -1;
+
   Options();
 };
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
index f6876cd..578b782 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
@@ -18,14 +18,105 @@
 
 #include "hdfs_ioservice.h"
 
+#include <thread>
+#include <mutex>
+#include <vector>
+
 #include "common/logging.h"
 
 namespace hdfs {
 
 IoService::~IoService() {}
 
-IoService *IoService::New() { return new IoServiceImpl(); }
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+std::shared_ptr<IoService> IoService::MakeShared() {
+  return std::make_shared<IoServiceImpl>();
+}
+
+
+unsigned int IoServiceImpl::InitDefaultWorkers() {
+  LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << 
" called.");
+  unsigned int logical_thread_count = std::thread::hardware_concurrency();
+#ifndef DISABLE_CONCURRENT_WORKERS
+  if(logical_thread_count < 1) {
+    LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not 
detect any logical processors.  Defaulting to 1 worker thread.");
+  } else {
+    LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << 
logical_thread_count << " logical threads and will spawn a worker for each.");
+  }
+#else
+  if(logical_thread_count > 0) {
+    LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << 
logical_thread_count << " threads available.  Concurrent workers are disabled 
so 1 worker thread will be used");
+  }
+  logical_thread_count = 1;
+#endif
+  return InitWorkers(logical_thread_count);
+}
+
+unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) {
+#ifdef DISABLED_CONCURRENT_WORKERS
+  LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count 
<< " threads specified but concurrent workers are disabled so 1 will be used");
+  thread_count = 1;
+#endif
+  unsigned int created_threads = 0;
+  for(unsigned int i=0; i<thread_count; i++) {
+    bool created = AddWorkerThread();
+    if(created) {
+      created_threads++;
+    } else {
+      LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
failed to create a worker thread");
+    }
+  }
+  if(created_threads != thread_count) {
+    LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
attempted to create "
+                            << thread_count << " but only created " << 
created_threads
+                            << " worker threads.  Make sure this process has 
adequate resources.");
+  }
+  return created_threads;
+}
 
+bool IoServiceImpl::AddWorkerThread() {
+  mutex_guard state_lock(state_lock_);
+  auto async_worker = [this]() {
+    this->ThreadStartHook();
+    this->Run();
+    this->ThreadExitHook();
+  };
+  worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) );
+  return true;
+}
+
+
+void IoServiceImpl::ThreadStartHook() {
+  mutex_guard state_lock(state_lock_);
+  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " starting");
+}
+
+void IoServiceImpl::ThreadExitHook() {
+  mutex_guard state_lock(state_lock_);
+  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " exiting");
+}
+
+void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) {
+  io_service_.post(asyncTask);
+}
+
+void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) {
+  // It is far too easy to destroy the filesystem (and thus the threadpool)
+  //     from within one of the worker threads, leading to a deadlock.  Let's
+  //     provide some explicit protection.
+  if(t->get_id() == std::this_thread::get_id()) {
+    LOG_ERROR(kAsyncRuntime, << 
"FileSystemImpl::WorkerDeleter::operator(treadptr="
+                             << t << ") : FATAL: Attempted to destroy a thread 
pool"
+                             "from within a callback of the thread pool!");
+  }
+  t->join();
+  delete t;
+}
+
+// As long as this just forwards to an asio::io_service method it doesn't need 
a lock
 void IoServiceImpl::Run() {
   // The IoService executes callbacks provided by library users in the context 
of worker threads,
   // there is no way of preventing those callbacks from throwing but we can at 
least prevent them
@@ -33,7 +124,7 @@ void IoServiceImpl::Run() {
 
   // As recommended in 
http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
   asio::io_service::work work(io_service_);
-  for(;;)
+  while(true)
   {
     try
     {
@@ -47,5 +138,9 @@ void IoServiceImpl::Run() {
   }
 }
 
+unsigned int IoServiceImpl::get_worker_thread_count() {
+  mutex_guard state_lock(state_lock_);
+  return worker_threads_.size();
+}
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
index 73d167e..294252b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
@@ -22,23 +22,56 @@
 #include "hdfspp/hdfspp.h"
 
 #include <asio/io_service.hpp>
+#include "common/util.h"
+
+#include <mutex>
+#include <thread>
 
 namespace hdfs {
 
+// Uncomment this to determine if issues are due to concurrency or logic faults
+// If tests still fail with concurrency disabled it's most likely a logic bug
+#define DISABLE_CONCURRENT_WORKERS
+
 /*
- *  A thin wrapper over the asio::io_service.
- *    -In the future this could own the worker threads that execute io tasks 
which
- *     makes it easier to share IoServices between FileSystems. See HDFS-10796 
for
- *     rationale.
+ *  A thin wrapper over the asio::io_service with a few extras
+ *    -manages it's own worker threads
+ *    -some helpers for sharing with multiple modules that need to do async 
work
  */
 
 class IoServiceImpl : public IoService {
  public:
+  IoServiceImpl() {}
+
+  virtual unsigned int InitDefaultWorkers() override;
+  virtual unsigned int InitWorkers(unsigned int thread_count) override;
+  virtual void PostTask(std::function<void(void)>& asyncTask) override;
   virtual void Run() override;
   virtual void Stop() override { io_service_.stop(); }
+
+  // Add a single worker thread, in the common case try to avoid this in favor
+  // of Init[Default]Workers. Public for use by tests and rare cases where a
+  // client wants very explicit control of threading for performance reasons
+  // e.g. pinning threads to NUMA nodes.
+  bool AddWorkerThread();
+
+  // Be very careful about using this: HDFS-10241
   ::asio::io_service &io_service() { return io_service_; }
+  unsigned int get_worker_thread_count();
  private:
+  std::mutex state_lock_;
   ::asio::io_service io_service_;
+
+  // For doing logging + resource manager updates on thread start/exit
+  void ThreadStartHook();
+  void ThreadExitHook();
+
+  // Support for async worker threads
+  struct WorkerDeleter {
+    void operator()(std::thread *t);
+  };
+  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+  std::vector<WorkerPtr> worker_threads_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
index ac1c336..94bce83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
@@ -183,6 +183,11 @@ LogMessage& LogMessage::operator<<(void *ptr) {
 }
 
 
+LogMessage& LogMessage::operator<<(const std::thread::id& tid) {
+  msg_buffer_ << tid;
+  return *this;
+}
+
 std::string LogMessage::MsgString() const {
   return msg_buffer_.str();
 }
@@ -199,12 +204,13 @@ const char * LogMessage::level_string() const {
   return kLevelStrings[level_];
 }
 
-const char * kComponentStrings[5] = {
-  "[Unknown     ]",
-  "[RPC         ]",
-  "[BlockReader ]",
-  "[FileHandle  ]",
-  "[FileSystem  ]"
+const char * kComponentStrings[6] = {
+  "[Unknown       ]",
+  "[RPC           ]",
+  "[BlockReader   ]",
+  "[FileHandle    ]",
+  "[FileSystem    ]",
+  "[Async Runtime ]",
 };
 
 const char * LogMessage::component_string() const {
@@ -213,6 +219,7 @@ const char * LogMessage::component_string() const {
     case kBlockReader: return kComponentStrings[2];
     case kFileHandle: return kComponentStrings[3];
     case kFileSystem: return kComponentStrings[4];
+    case kAsyncRuntime: return kComponentStrings[5];
     default: return kComponentStrings[0];
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
index f807fc4..69f9c6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
@@ -27,6 +27,7 @@
 #include <sstream>
 #include <mutex>
 #include <memory>
+#include <thread>
 
 #include <asio/ip/tcp.hpp>
 
@@ -49,11 +50,12 @@ enum LogLevel {
 };
 
 enum LogSourceComponent {
-  kUnknown     = 1 << 0,
-  kRPC         = 1 << 1,
-  kBlockReader = 1 << 2,
-  kFileHandle  = 1 << 3,
-  kFileSystem  = 1 << 4,
+  kUnknown      = 1 << 0,
+  kRPC          = 1 << 1,
+  kBlockReader  = 1 << 2,
+  kFileHandle   = 1 << 3,
+  kFileSystem   = 1 << 4,
+  kAsyncRuntime = 1 << 5,
 };
 
 #define LOG_TRACE(C, MSG) do { \
@@ -196,6 +198,9 @@ class LogMessage {
   //asio types
   LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
 
+  //thread and mutex types
+  LogMessage& operator<<(const std::thread::id& tid);
+
 
   std::string MsgString() const;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
index a889be5..728082e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -39,7 +39,11 @@ Options::Options() : rpc_timeout(kDefaultRpcTimeout),
                      failover_max_retries(kDefaultFailoverMaxRetries),
                      
failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries),
                      authentication(kDefaultAuthentication),
-                     block_size(kDefaultBlockSize) {}
+                     block_size(kDefaultBlockSize),
+                     io_threads_(kDefaultIoThreads)
+{
+
+}
 
 std::string NamenodeInfo::get_host() const {
   return uri.get_host();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index d805716..a5f3aad 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -79,11 +79,20 @@ FileSystem *FileSystem::New(
   return new FileSystemImpl(io_service, user_name, options);
 }
 
+FileSystem *FileSystem::New(
+    std::shared_ptr<IoService> io_service, const std::string &user_name, const 
Options &options) {
+  return new FileSystemImpl(io_service, user_name, options);
+}
+
 FileSystem *FileSystem::New() {
   // No, this pointer won't be leaked.  The FileSystem takes ownership.
-  IoService *io_service = IoService::New();
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   if(!io_service)
     return nullptr;
+  int thread_count = io_service->InitDefaultWorkers();
+  if(thread_count < 1)
+    return nullptr;
+
   std::string user_name = get_effective_user_name("");
   Options options;
   return new FileSystemImpl(io_service, user_name, options);
@@ -123,25 +132,56 @@ const std::string get_effective_user_name(const 
std::string &user_name) {
 }
 
 FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string 
&user_name, const Options &options) :
-    options_(options), client_name_(GetRandomClientName()), io_service_(
-        static_cast<IoServiceImpl *>(io_service)),
-        nn_(
-          &io_service_->io_service(), options, client_name_,
-          get_effective_user_name(user_name), kNamenodeProtocol,
-          kNamenodeProtocolVersion
-        ), bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
-        event_handlers_(std::make_shared<LibhdfsEvents>()) {
-
-  LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
+     io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options),
+     client_name_(GetRandomClientName()),
+     nn_(
+       &io_service_->io_service(), options, client_name_,
+       get_effective_user_name(user_name), kNamenodeProtocol,
+       kNamenodeProtocolVersion
+     ),
+     bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
+     event_handlers_(std::make_shared<LibhdfsEvents>())
+{
+
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
                          << FMT_THIS_ADDR << ") called");
 
   // Poor man's move
   io_service = nullptr;
 
-  /* spawn background threads for asio delegation */
-  unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
-  for (unsigned int i = 0; i < threads; i++) {
-    AddWorkerThread();
+  unsigned int running_workers = 0;
+  if(options.io_threads_ < 1) {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing 
default number of worker threads");
+    running_workers = io_service_->InitDefaultWorkers();
+  } else {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " 
<< options_.io_threads_ << " worker threads.");
+    running_workers = io_service->InitWorkers(options_.io_threads_);
+  }
+
+  if(running_workers < 1) {
+    LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to 
start worker threads");
+  }
+}
+
+FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const 
std::string& user_name, const Options &options) :
+     io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), 
options_(options),
+     client_name_(GetRandomClientName()),
+     nn_(
+       &io_service_->io_service(), options, client_name_,
+       get_effective_user_name(user_name), kNamenodeProtocol,
+       kNamenodeProtocolVersion
+     ),
+     bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
+     event_handlers_(std::make_shared<LibhdfsEvents>())
+{
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
+                         << FMT_THIS_ADDR << ", shared IoService@" << 
io_service_.get() << ") called");
+  int worker_thread_count = io_service_->get_worker_thread_count();
+  if(worker_thread_count < 1) {
+    LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService 
provided doesn't have any worker threads. "
+                          << "It needs at least 1 worker to connect to an HDFS 
cluster.")
+  } else {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << 
worker_thread_count << " worker threads.");
   }
 }
 
@@ -154,7 +194,6 @@ FileSystemImpl::~FileSystemImpl() {
    * Once worker threads are joined and deleted the service can be deleted.
    **/
   io_service_->Stop();
-  worker_threads_.clear();
 }
 
 void FileSystemImpl::Connect(const std::string &server,
@@ -230,12 +269,21 @@ void FileSystemImpl::ConnectToDefaultFs(const 
std::function<void(const Status &,
 int FileSystemImpl::AddWorkerThread() {
   LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
                                   << FMT_THIS_ADDR << ") called."
-                                  << " Existing thread count = " << 
worker_threads_.size());
+                                  << " Existing thread count = " << 
WorkerThreadCount());
+
+  if(!io_service_)
+    return -1;
 
-  auto service_task = [](IoService *service) { service->Run(); };
-  worker_threads_.push_back(
-      WorkerPtr(new std::thread(service_task, io_service_.get())));
-  return worker_threads_.size();
+  io_service_->AddWorkerThread();
+  return 1;
+}
+
+int FileSystemImpl::WorkerThreadCount() {
+  if(!io_service_) {
+    return -1;
+  } else {
+    return io_service_->get_worker_thread_count();
+  }
 }
 
 void FileSystemImpl::Open(
@@ -714,21 +762,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string 
&path,
   nn_.DisallowSnapshot(path, handler);
 }
 
-
-void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
-  // It is far too easy to destroy the filesystem (and thus the threadpool)
-  //     from within one of the worker threads, leading to a deadlock.  Let's
-  //     provide some explicit protection.
-  if(t->get_id() == std::this_thread::get_id()) {
-    LOG_ERROR(kFileSystem, << 
"FileSystemImpl::WorkerDeleter::operator(treadptr="
-                           << t << ") : FATAL: Attempted to destroy a thread 
pool"
-                           "from within a callback of the thread pool!");
-  }
-  t->join();
-  delete t;
-}
-
-
 void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
   if (event_handlers_) {
     event_handlers_->set_fs_callback(callback);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 0e9cedd..80978cf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -46,7 +46,8 @@ namespace hdfs {
 class FileSystemImpl : public FileSystem {
 public:
   MEMCHECKED_CLASS(FileSystemImpl)
-  FileSystemImpl(IoService *&io_service, const std::string& user_name, const 
Options &options);
+  explicit FileSystemImpl(IoService *&io_service, const std::string& 
user_name, const Options &options);
+  explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& 
user_name, const Options &options);
   ~FileSystemImpl() override;
 
   /* attempt to connect to namenode, return bad status on failure */
@@ -176,7 +177,7 @@ public:
   int AddWorkerThread();
 
   /* how many worker threads are servicing asio requests */
-  int WorkerThreadCount() { return worker_threads_.size(); }
+  int WorkerThreadCount();
 
   /* all monitored events will need to lookup handlers */
   std::shared_ptr<LibhdfsEvents> get_event_handlers();
@@ -184,24 +185,18 @@ public:
   Options get_options();
 
 private:
-  const Options options_;
-  const std::string client_name_;
-  std::string cluster_name_;
   /**
    *  The IoService must be the first member variable to ensure that it gets
    *  destroyed last.  This allows other members to dequeue things from the
    *  service in their own destructors.
    **/
-  std::unique_ptr<IoServiceImpl> io_service_;
+  std::shared_ptr<IoServiceImpl> io_service_;
+  const Options options_;
+  const std::string client_name_;
+  std::string cluster_name_;
   NameNodeOperations nn_;
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 
-  struct WorkerDeleter {
-    void operator()(std::thread *t);
-  };
-  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
-  std::vector<WorkerPtr> worker_threads_;
-
   /**
    * Runtime event monitoring handlers.
    * Note:  This is really handy to have for advanced usage but

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 3dd4aae..395fad5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -112,6 +112,11 @@ add_executable(logging_test logging_test.cc)
 target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto 
common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} 
${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(logging_test logging_test)
 
+add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc)
+target_link_libraries(hdfs_ioservice_test fs gmock_main common 
${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} 
${CMAKE_THREAD_LIBS_INIT})
+add_memcheck_test(hdfs_ioservice hdfs_ioservice_test)
+
+
 #
 #
 #   INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7f4e667/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h
index 3ec58e1..aecced1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h
@@ -113,7 +113,7 @@ private:
 
 class MiniCluster  {
 public:
-  MiniCluster() : io_service(IoService::New()) {
+  MiniCluster() : io_service(IoService::MakeShared()) {
     struct NativeMiniDfsConf conf = {
         1, /* doFormat */
         0, /* webhdfs */
@@ -137,6 +137,10 @@ public:
   // Connect via the C++ API
   FSHandle connect(const std::string username) {
     Options options;
+
+    unsigned int worker_count = io_service->InitDefaultWorkers();
+    EXPECT_NE(0, worker_count);
+
     FileSystem * fs = FileSystem::New(io_service, username, options);
     EXPECT_NE(nullptr, fs);
     FSHandle result(fs);
@@ -184,7 +188,7 @@ public:
 
 protected:
   struct NativeMiniDfsCluster* clusterInfo;
-  IoService * io_service;
+  std::shared_ptr<IoService> io_service;
 };
 
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to