HDFS-11028: libhdfs++: FileSystem needs to be able to cancel pending 
connections.  Contributed by James Clampffer


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/872ca881
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/872ca881
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/872ca881

Branch: refs/heads/HDFS-12996
Commit: 872ca88101473426a24c9db81c36926059e4e483
Parents: 996f140
Author: James <j...@apache.org>
Authored: Mon Jan 23 18:19:13 2017 -0500
Committer: Hanisha Koneru <hanishakon...@apache.org>
Committed: Mon Mar 26 11:11:03 2018 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/examples/c/CMakeLists.txt  |   1 +
 .../examples/c/connect_cancel/CMakeLists.txt    |  35 ++++
 .../examples/c/connect_cancel/connect_cancel.c  | 107 +++++++++++++
 .../libhdfspp/examples/cpp/CMakeLists.txt       |   1 +
 .../examples/cpp/connect_cancel/CMakeLists.txt  |  29 ++++
 .../cpp/connect_cancel/connect_cancel.cc        | 158 +++++++++++++++++++
 .../native/libhdfspp/include/hdfspp/hdfs_ext.h  |  31 ++++
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |   6 +
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     | 108 +++++++++++++
 .../libhdfspp/lib/common/continuation/asio.h    |  86 ----------
 .../lib/common/continuation/continuation.h      |  12 --
 .../libhdfspp/lib/common/hdfs_configuration.cc  |  25 ++-
 .../libhdfspp/lib/common/namenode_info.cc       | 123 ++++++++++-----
 .../src/main/native/libhdfspp/lib/common/util.h |  68 ++++++++
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  42 ++++-
 .../main/native/libhdfspp/lib/fs/filesystem.h   |   8 +
 .../libhdfspp/lib/fs/namenode_operations.cc     |   4 +
 .../libhdfspp/lib/fs/namenode_operations.h      |   2 +
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  21 ++-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |   5 +
 20 files changed, 729 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt
index 76880cd..a73d2bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt
@@ -17,3 +17,4 @@
 #
 
 add_subdirectory(cat)
+add_subdirectory(connect_cancel)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt
new file mode 100644
index 0000000..56aeeea
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default LIBHDFSPP_DIR to the default install location.  You can override
+#    it by add -DLIBHDFSPP_DIR=... to your cmake invocation
+set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
+
+include_directories( ${LIBHDFSPP_DIR}/include )
+link_directories( ${LIBHDFSPP_DIR}/lib )
+
+add_executable(connect_cancel_c connect_cancel.c)
+target_link_libraries(connect_cancel_c hdfspp uriparser2)
+
+# Several examples in different languages need to produce executables with
+# same names. To allow executables with same names we keep their CMake
+# names different, but specify their executable names as follows:
+set_target_properties( connect_cancel_c
+    PROPERTIES
+    OUTPUT_NAME "connect_cancel_c"
+)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c
new file mode 100644
index 0000000..f6af6d1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c
@@ -0,0 +1,107 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+/*
+  Attempt to connect to a cluster and use Control-C to bail out if it takes a 
while.
+  Valid config must be in environment variable $HADOOP_CONF_DIR
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "hdfspp/hdfs_ext.h"
+#include "common/util_c.h"
+
+#define ERROR_BUFFER_SIZE 1024
+
+// Global so signal handler can get at it
+hdfsFS fs = NULL;
+
+const char *catch_enter  = "In signal handler, going to try and cancel.\n";
+const char *catch_cancel = "hdfsCancelPendingConnect has been canceled in the 
signal handler.\n";
+const char *catch_exit   = "Exiting the signal handler.\n";
+
+// Print to stdout without calling malloc or otherwise indirectly modify 
userspace state.
+// Write calls to stdout may still interleave with stuff coming from elsewhere.
+static void sighandler_direct_stdout(const char *msg) {
+  if(!msg)
+    return;
+  ssize_t res = write(1 /*posix stdout fd*/, msg, strlen(msg));
+  (void)res;
+}
+
+static void sig_catch(int val) {
+  // Beware of calling things that aren't reentrant e.g. malloc while in a 
signal handler.
+  sighandler_direct_stdout(catch_enter);
+
+  if(fs) {
+    hdfsCancelPendingConnection(fs);
+    sighandler_direct_stdout(catch_cancel);
+  }
+  sighandler_direct_stdout(catch_exit);
+}
+
+
+int main(int argc, char** argv) {
+  hdfsSetLoggingLevel(HDFSPP_LOG_LEVEL_INFO);
+  signal(SIGINT, sig_catch);
+
+  char error_text[ERROR_BUFFER_SIZE];
+  if (argc != 1) {
+    fprintf(stderr, "usage: ./connect_cancel_c\n");
+    ShutdownProtobufLibrary_C();
+    exit(EXIT_FAILURE);
+  }
+
+  const char *hdfsconfdir = getenv("HADOOP_CONF_DIR");
+  if(!hdfsconfdir) {
+    fprintf(stderr, "$HADOOP_CONF_DIR must be set\n");
+    ShutdownProtobufLibrary_C();
+    exit(EXIT_FAILURE);
+  }
+
+  struct hdfsBuilder* builder = hdfsNewBuilderFromDirectory(hdfsconfdir);
+
+  fs = hdfsAllocateFileSystem(builder);
+  if (fs == NULL) {
+    hdfsGetLastError(error_text, ERROR_BUFFER_SIZE);
+    fprintf(stderr, "hdfsAllocateFileSystem returned null.\n%s\n", error_text);
+    hdfsFreeBuilder(builder);
+    ShutdownProtobufLibrary_C();
+    exit(EXIT_FAILURE);
+  }
+
+  int connected = hdfsConnectAllocated(fs, builder);
+  if (connected != 0) {
+    hdfsGetLastError(error_text, ERROR_BUFFER_SIZE);
+    fprintf(stderr, "hdfsConnectAllocated errored.\n%s\n", error_text);
+    hdfsFreeBuilder(builder);
+    ShutdownProtobufLibrary_C();
+    exit(EXIT_FAILURE);
+  }
+
+  hdfsDisconnect(fs);
+  hdfsFreeBuilder(builder);
+  // Clean up static data and prevent valgrind memory leaks
+  ShutdownProtobufLibrary_C();
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt
index 9e16b0b..8893f03 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt
@@ -19,3 +19,4 @@
 add_subdirectory(cat)
 add_subdirectory(gendirs)
 add_subdirectory(find)
+add_subdirectory(connect_cancel)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt
new file mode 100644
index 0000000..a029a99
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default LIBHDFSPP_DIR to the default install location.  You can override
+#    it by add -DLIBHDFSPP_DIR=... to your cmake invocation
+set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
+
+include_directories( ${LIBHDFSPP_DIR}/include )
+link_directories( ${LIBHDFSPP_DIR}/lib )
+
+add_executable(connect_cancel connect_cancel.cc)
+target_link_libraries(connect_cancel hdfspp)
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc
new file mode 100644
index 0000000..7e5daa9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc
@@ -0,0 +1,158 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+
+#include "hdfspp/hdfspp.h"
+#include "common/hdfs_configuration.h"
+#include "common/configuration_loader.h"
+
+#include <google/protobuf/stubs/common.h>
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <thread>
+#include <iostream>
+
+// Simple example of how to cancel an async connect call.
+// Here Control-C (SIGINT) is caught in order to invoke the FS level cancel and
+// properly tear down the process.  Valgrind should show no leaked memory on 
exit
+// when cancel has been called.  URI parsing code is omitted and defaultFs from
+// /etc/hadoop/conf or $HADOOP_CONF_DIR is always used.
+
+// Scoped globally to make it simple to reference from the signal handler.
+std::shared_ptr<hdfs::FileSystem> fs;
+
+const std::string catch_enter("In signal handler, going to try and cancel 
FileSystem::Connect.\n");
+const std::string catch_cancel("FileSystem::Cancel has been canceled in the 
signal handler.\n");
+const std::string catch_exit("Exiting the signal handler.\n");
+
+// Avoid IO reentrancy issues, see comments in signal handler below.
+// It's possible that the write interleaves with another write call,
+// but it won't corrupt the stack or heap.
+static void sighandler_direct_stdout(const std::string &msg) {
+  ssize_t res = ::write(1 /*posix stdout FD*/, msg.data(), msg.size());
+  // In production you'd want to check res, but error handling code will
+  // need to be fairly application specific if it's going to properly
+  // avoid reentrant calls to malloc.
+  (void)res;
+}
+
+// Signal handler to make a SIGINT call cancel rather than exit().
+static void sig_catch(int val) {
+  (void)val;
+  // This is avoiding the tricky bits of signal handling, notably that the
+  // underlying string manipulation and IO functions used by the the logger
+  // are unlikely to be reentrant.
+  //
+  // Production code could mask out all logging on handler entry and enable
+  // it again on exit; here we just assume it's "good enough" and some
+  // (possibly broken) log messages are better than none.
+
+  sighandler_direct_stdout(catch_enter);
+  if(fs) {
+    // This will invoke the callback immediately with an OperationCanceled 
status
+    fs->CancelPendingConnect();
+    sighandler_direct_stdout(catch_cancel);
+  }
+  sighandler_direct_stdout(catch_exit);
+}
+
+
+int main(int arg_token_count, const char **args) {
+  (void)args;
+  if(arg_token_count != 1) {
+    std::cerr << "usage: ./connect_cancel";
+    google::protobuf::ShutdownProtobufLibrary();
+    exit(EXIT_FAILURE);
+  }
+
+  // Register signal handle to asynchronously invoke cancel from outside the 
main thread.
+  signal(SIGINT, sig_catch);
+
+  // Generic setup/config code much like the other examples.
+  hdfs::Options options;
+  //Setting the config path to the default: "$HADOOP_CONF_DIR" or 
"/etc/hadoop/conf"
+  hdfs::ConfigurationLoader loader;
+  //Loading default config files core-site.xml and hdfs-site.xml from the 
config path
+  hdfs::optional<hdfs::HdfsConfiguration> config = 
loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
+  //TODO: HDFS-9539 - after this is resolved, valid config will always be 
returned.
+  if(config){
+    //Loading options from the config
+    options = config->GetOptions();
+  }
+
+
+  // Start an IoService and some worker threads
+  std::shared_ptr<hdfs::IoService> service = hdfs::IoService::MakeShared();
+  if(nullptr == service) {
+    std::cerr << "Unable to create IoService" << std::endl;
+    fs.reset();
+    // Nasty hack to clean up for valgrind since we don't have the C++17 
optional<T>::reset method
+    config = decltype(config)();
+    google::protobuf::ShutdownProtobufLibrary();
+    exit(EXIT_FAILURE);
+  }
+
+  unsigned int worker_count = service->InitDefaultWorkers();
+  if(worker_count < 1) {
+    std::cerr << "Unable to create IoService worker threads";
+    fs.reset();
+    service->Stop();
+    config = decltype(config)();
+    google::protobuf::ShutdownProtobufLibrary();
+    exit(EXIT_FAILURE);
+  }
+
+  // Set up and connect to the FileSystem
+  fs.reset(hdfs::FileSystem::New(service, "", options));
+  if(nullptr == fs) {
+    std::cerr << "Unable to create FileSystem" << std::endl;
+    fs.reset();
+    service->Stop();
+    config = decltype(config)();
+    google::protobuf::ShutdownProtobufLibrary();
+    exit(EXIT_FAILURE);
+  }
+
+  hdfs::Status status = fs->ConnectToDefaultFs();
+  if (!status.ok()) {
+    if(!options.defaultFS.get_host().empty()){
+      std::cerr << "Error connecting to " << options.defaultFS << ". " << 
status.ToString() << std::endl;
+    } else {
+      std::cerr << "Error connecting to the cluster: defaultFS is empty. " << 
status.ToString() << std::endl;
+    }
+    fs.reset();
+    service->Stop();
+    config = decltype(config)();
+    google::protobuf::ShutdownProtobufLibrary();
+    exit(EXIT_FAILURE);
+  }
+
+  fs.reset();
+  service->Stop();
+  config = decltype(config)();
+  google::protobuf::ShutdownProtobufLibrary();
+
+  return 0;
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
index 72434e6..2b15cf4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
@@ -299,6 +299,7 @@ int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback 
handler, int64_t cook
  *                    objects; NULL on error or empty result.
  *                    errno is set to non-zero on error or zero on success.
  **/
+LIBHDFS_EXTERNAL
 hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, 
uint32_t * numEntries);
 
 
@@ -314,6 +315,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const 
char* name, uint32_t
  *  @param name    Name to be given to the created snapshot (may be NULL)
  *  @return        0 on success, corresponding errno on failure
  **/
+LIBHDFS_EXTERNAL
 int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name);
 
 /**
@@ -324,6 +326,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const 
char* name);
  *  @param name    Name of the snapshot to be deleted (must be non-blank)
  *  @return        0 on success, corresponding errno on failure
  **/
+LIBHDFS_EXTERNAL
 int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name);
 
 /**
@@ -333,6 +336,7 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const 
char* name);
  *  @param path    Path to the directory to be made snapshottable (must be 
non-blank)
  *  @return        0 on success, corresponding errno on failure
  **/
+LIBHDFS_EXTERNAL
 int hdfsAllowSnapshot(hdfsFS fs, const char* path);
 
 /**
@@ -342,8 +346,35 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path);
  *  @param path    Path to the directory to be made non-snapshottable (must be 
non-blank)
  *  @return        0 on success, corresponding errno on failure
  **/
+LIBHDFS_EXTERNAL
 int hdfsDisallowSnapshot(hdfsFS fs, const char* path);
 
+/**
+ * Create a FileSystem based on the builder but don't connect
+ * @param bld     Used to populate config options in the same manner as 
hdfsBuilderConnect.
+ *                Does not free builder.
+ **/
+LIBHDFS_EXTERNAL
+hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld);
+
+/**
+ * Connect a FileSystem created with hdfsAllocateFileSystem
+ * @param fs      A disconnected FS created with hdfsAllocateFileSystem
+ * @param bld     The same or exact copy of the builder used for Allocate, we 
still need a few fields.
+ *                Does not free builder.
+ * @return        0 on success, corresponding errno on failure
+ **/
+LIBHDFS_EXTERNAL
+int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld);
+
+/**
+ * Cancel a pending connection on a FileSystem
+ * @param fs      A fs in the process of connecting using hdfsConnectAllocated 
in another thread.
+ * @return        0 on success, corresponding errno on failure
+ **/
+LIBHDFS_EXTERNAL
+int hdfsCancelPendingConnection(hdfsFS fs);
+
 
 #ifdef __cplusplus
 } /* end extern "C" */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 5fbd3d8..46a1e61 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -217,6 +217,12 @@ class FileSystem {
   virtual Status ConnectToDefaultFs() = 0;
 
   /**
+   * Cancels any attempts to connect to the HDFS cluster.
+   * FileSystem is expected to be destroyed after invoking this.
+   */
+  virtual bool CancelPendingConnect() = 0;
+
+  /**
    * Open a file on HDFS. The call issues an RPC to the NameNode to
    * gather the locations of all blocks in the file and to return a
    * new instance of the @ref InputStream object.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index dd7d00c..0608244 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -332,6 +332,94 @@ hdfsFS doHdfsConnect(optional<std::string> nn, 
optional<tPort> port, optional<st
   }
 }
 
+hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
+  // Same idea as the first half of doHdfsConnect, but return the wrapped FS 
before
+  // connecting.
+  try {
+    errno = 0;
+    std::shared_ptr<IoService> io_service = IoService::MakeShared();
+
+    int io_thread_count = bld->config.GetOptions().io_threads_;
+    if(io_thread_count < 1) {
+      io_service->InitDefaultWorkers();
+    } else {
+      io_service->InitWorkers(io_thread_count);
+    }
+
+    FileSystem *fs = FileSystem::New(io_service, bld->user.value_or(""), 
bld->config.GetOptions());
+    if (!fs) {
+      ReportError(ENODEV, "Could not create FileSystem object");
+      return nullptr;
+    }
+
+    if (fsEventCallback) {
+      fs->SetFsEventCallback(fsEventCallback.value());
+    }
+
+    return new hdfs_internal(fs);
+  } catch (const std::exception &e) {
+    ReportException(e);
+    return nullptr;
+  } catch (...) {
+    ReportCaughtNonException();
+    return nullptr;
+  }
+  return nullptr;
+}
+
+int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
+  if(!CheckSystem(fs)) {
+    return ENODEV;
+  }
+
+  if(!bld) {
+    ReportError(ENODEV, "No hdfsBuilder object supplied");
+    return ENODEV;
+  }
+
+  // Get C++ FS to do connect
+  FileSystem *fsImpl = fs->get_impl();
+  if(!fsImpl) {
+    ReportError(ENODEV, "Null FileSystem implementation");
+    return ENODEV;
+  }
+
+  // Unpack the required bits of the hdfsBuilder
+  optional<std::string> nn = bld->overrideHost;
+  optional<tPort> port = bld->overridePort;
+  optional<std::string> user = bld->user;
+
+  // try-catch in case some of the third-party stuff throws
+  try {
+    Status status;
+    if (nn || port) {
+      if (!port) {
+        port = kDefaultPort;
+      }
+      std::string port_as_string = std::to_string(*port);
+      status = fsImpl->Connect(nn.value_or(""), port_as_string);
+    } else {
+      status = fsImpl->ConnectToDefaultFs();
+    }
+
+    if (!status.ok()) {
+      Error(status);
+      return ENODEV;
+    }
+
+    // 0 to indicate a good connection
+    return 0;
+  } catch (const std::exception & e) {
+    ReportException(e);
+    return ENODEV;
+  } catch (...) {
+    ReportCaughtNonException();
+    return ENODEV;
+  }
+
+  return 0;
+}
+
 hdfsFS hdfsConnect(const char *nn, tPort port) {
   return hdfsConnectAsUser(nn, port, "");
 }
@@ -350,6 +438,26 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
   return hdfsConnectAsUser(nn, port, "");
 }
 
+int hdfsCancelPendingConnection(hdfsFS fs) {
+  // todo: stick an enum in hdfs_internal to check the connect state
+  if(!CheckSystem(fs)) {
+    return ENODEV;
+  }
+
+  FileSystem *fsImpl = fs->get_impl();
+  if(!fsImpl) {
+    ReportError(ENODEV, "Null FileSystem implementation");
+    return ENODEV;
+  }
+
+  bool canceled = fsImpl->CancelPendingConnect();
+  if(canceled) {
+    return 0;
+  } else {
+    return EINTR;
+  }
+}
+
 int hdfsDisconnect(hdfsFS fs) {
   try
   {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
index 3f650ce..193358f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -34,24 +34,6 @@ namespace asio_continuation {
 
 using namespace continuation;
 
-template <class Stream, class MutableBufferSequence>
-class ReadContinuation : public Continuation {
-public:
-  ReadContinuation(std::shared_ptr<Stream>& stream, const 
MutableBufferSequence &buffer)
-      : stream_(stream), buffer_(buffer) {}
-  virtual void Run(const Next &next) override {
-    auto handler =
-        [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
-    asio::async_read(*stream_, buffer_, handler);
-  }
-
-private:
-  // prevent construction from raw ptr
-  ReadContinuation(Stream *stream, MutableBufferSequence &buffer);
-  std::shared_ptr<Stream> stream_;
-  MutableBufferSequence buffer_;
-};
-
 template <class Stream, class ConstBufferSequence>
 class WriteContinuation : public Continuation {
 public:
@@ -71,80 +53,12 @@ private:
   ConstBufferSequence buffer_;
 };
 
-template <class Socket, class Iterator>
-class ConnectContinuation : public Continuation {
-public:
-  ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
-                      Iterator *connected_endpoint)
-      : socket_(socket), begin_(begin), end_(end),
-        connected_endpoint_(connected_endpoint) {}
-
-  virtual void Run(const Next &next) override {
-    auto handler = [this, next](const asio::error_code &ec, Iterator it) {
-      if (connected_endpoint_) {
-        *connected_endpoint_ = it;
-      }
-      next(ToStatus(ec));
-    };
-    asio::async_connect(*socket_, begin_, end_, handler);
-  }
-
-private:
-  Socket *socket_;
-  Iterator begin_;
-  Iterator end_;
-  Iterator *connected_endpoint_;
-};
-
-template <class OutputIterator>
-class ResolveContinuation : public Continuation {
-public:
-  ResolveContinuation(::asio::io_service *io_service, const std::string 
&server,
-                      const std::string &service, OutputIterator result)
-      : resolver_(*io_service), query_(server, service), result_(result) {}
-
-  virtual void Run(const Next &next) override {
-    using resolver = ::asio::ip::tcp::resolver;
-    auto handler =
-        [this, next](const asio::error_code &ec, resolver::iterator it) {
-          if (!ec) {
-            std::copy(it, resolver::iterator(), result_);
-          }
-          next(ToStatus(ec));
-        };
-    resolver_.async_resolve(query_, handler);
-  }
-
-private:
-  ::asio::ip::tcp::resolver resolver_;
-  ::asio::ip::tcp::resolver::query query_;
-  OutputIterator result_;
-};
-
 template <class Stream, class ConstBufferSequence>
 static inline Continuation *Write(std::shared_ptr<Stream> stream,
                                   const ConstBufferSequence &buffer) {
   return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
 }
 
-template <class Stream, class MutableBufferSequence>
-static inline Continuation *Read(std::shared_ptr<Stream> stream,
-                                 const MutableBufferSequence &buffer) {
-  return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
-}
-
-template <class Socket, class Iterator>
-static inline Continuation *Connect(Socket *socket, Iterator begin,
-                                    Iterator end) {
-  return new ConnectContinuation<Socket, Iterator>(socket, begin, end, 
nullptr);
-}
-
-template <class OutputIterator>
-static inline Continuation *
-Resolve(::asio::io_service *io_service, const std::string &server,
-        const std::string &service, OutputIterator result) {
-  return new ResolveContinuation<OutputIterator>(io_service, server, service, 
result);
-}
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
index 4c9b8ad..18aa146 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -131,18 +131,6 @@ template <class State> inline void 
Pipeline<State>::Run(UserHandler &&handler) {
   Schedule(Status::OK());
 }
 
-template <class Handler> class BindContinuation : public Continuation {
-public:
-  BindContinuation(const Handler &handler) : handler_(handler) {}
-  virtual void Run(const Next &next) override { handler_(next); }
-
-private:
-  Handler handler_;
-};
-
-template <class Handler> static inline Continuation *Bind(const Handler 
&handler) {
-  return new BindContinuation<Handler>(handler);
-}
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
index 6778bad..ab87506 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
@@ -71,6 +71,17 @@ std::vector<std::string> SplitOnComma(const std::string &s, 
bool include_empty_s
   return res;
 }
 
+std::string RemoveSpaces(const std::string &str) {
+  std::string res;
+  for(unsigned int i=0; i<str.size(); i++) {
+    char curr = str[i];
+    if(curr != ' ') {
+      res += curr;
+    }
+  }
+  return res;
+}
+
 // Prepend hdfs:// to string if there isn't already a scheme
 // Converts unset optional into empty string
 std::string PrependHdfsScheme(optional<std::string> str) {
@@ -92,6 +103,8 @@ struct ha_parse_error : public std::exception {
 };
 
 std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const 
std::string &nameservice) {
+  LOG_TRACE(kRPC, << "HDFSConfiguration@" << this << "::LookupNameService( 
nameservice=" << nameservice<< " ) called");
+
   std::vector<NamenodeInfo> namenodes;
   try {
     // Find namenodes that belong to nameservice
@@ -104,8 +117,10 @@ std::vector<NamenodeInfo> 
HdfsConfiguration::LookupNameService(const std::string
       else
         throw ha_parse_error("unable to find " + service_nodes);
 
-      for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++)
-        LOG_INFO(kRPC, << "Namenode: " << *it);
+      for(unsigned int i=0; i<namenode_ids.size(); i++) {
+        namenode_ids[i] = RemoveSpaces(namenode_ids[i]);
+        LOG_INFO(kRPC, << "Namenode: " << namenode_ids[i]);
+      }
     }
 
     // should this error if we only find 1 NN?
@@ -123,7 +138,11 @@ std::vector<NamenodeInfo> 
HdfsConfiguration::LookupNameService(const std::string
       }
 
       URI uri = node_uri.value();
-      LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << 
uri.GetDebugString());
+      if(uri.str() == "") {
+        LOG_WARN(kRPC, << "Attempted to read info for nameservice " << 
nameservice << " node " << dom_node_name << " but didn't find anything.")
+      } else {
+        LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << 
uri.GetDebugString());
+      }
 
       NamenodeInfo node(nameservice, *node_id, uri);
       namenodes.push_back(node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
index bc38be7..bd43091 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
@@ -18,7 +18,7 @@
 
 #include "namenode_info.h"
 
-#include "common/continuation/asio.h"
+#include "common/util.h"
 #include "common/logging.h"
 
 #include <sstream>
@@ -71,62 +71,107 @@ bool ResolveInPlace(::asio::io_service *ioservice, 
ResolvedNamenodeInfo &info) {
   return true;
 }
 
+typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
+
+// RAII wrapper
+class ScopedResolver {
+ private:
+  ::asio::io_service *io_service_;
+  std::string host_;
+  std::string port_;
+  ::asio::ip::tcp::resolver::query query_;
+  ::asio::ip::tcp::resolver resolver_;
+  endpoint_vector endpoints_;
+
+  // Caller blocks on access if resolution isn't finished
+  std::shared_ptr<std::promise<Status>> result_status_;
+ public:
+  ScopedResolver(::asio::io_service *service, const std::string &host, const 
std::string &port) :
+        io_service_(service), host_(host), port_(port), query_(host, port), 
resolver_(*io_service_)
+  {
+    if(!io_service_)
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed 
nullptr to io_service");
+  }
 
-std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes) {
-  using namespace asio_continuation;
+  ~ScopedResolver() {
+    resolver_.cancel();
+  }
 
-  typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
-  typedef Pipeline<endpoint_vector> resolve_pipeline_t;
+  bool BeginAsyncResolve() {
+    // result_status_ would only exist if this was previously called.  Invalid 
state.
+    if(result_status_) {
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << 
"::BeginAsyncResolve invalid call: may only be called once per instance");
+      return false;
+    } else if(!io_service_) {
+      LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << 
"::BeginAsyncResolve invalid call: null io_service");
+      return false;
+    }
 
+    // Now set up the promise, set it in async_resolve's callback
+    result_status_ = std::make_shared<std::promise<Status>>();
+
+    // Callback to pull a copy of endpoints out of resolver and set promise
+    auto callback = [this](const asio::error_code &ec, 
::asio::ip::tcp::resolver::iterator out) {
+      if(!ec) {
+        std::copy(out, ::asio::ip::tcp::resolver::iterator(), 
std::back_inserter(endpoints_));
+      }
+      result_status_->set_value( ToStatus(ec) );
+    };
+    resolver_.async_resolve(query_, callback);
+    return true;
+  }
 
-  std::vector<std::pair<resolve_pipeline_t*, 
std::shared_ptr<std::promise<Status>>>> pipelines;
-  pipelines.reserve(nodes.size());
+  Status Join() {
+    if(!result_status_) {
+      std::ostringstream errmsg;
+      errmsg <<  "ScopedResolver@" << this << "Join invalid call: promise 
never set";
+      return Status::InvalidArgument(errmsg.str().c_str());
+    }
+
+    std::future<Status> future_result = result_status_->get_future();
+    Status res = future_result.get();
+    return res;
+  }
+
+  endpoint_vector GetEndpoints() {
+    // Explicitly return by value to decouple lifecycles.
+    return endpoints_;
+  }
+};
+
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes) {
+  std::vector< std::unique_ptr<ScopedResolver> > resolvers;
+  resolvers.reserve(nodes.size());
 
   std::vector<ResolvedNamenodeInfo> resolved_info;
-  // This must never reallocate once async ops begin
   resolved_info.reserve(nodes.size());
 
   for(unsigned int i=0; i<nodes.size(); i++) {
     std::string host = nodes[i].get_host();
     std::string port = nodes[i].get_port();
 
-    ResolvedNamenodeInfo resolved;
-    resolved = nodes[i];
-    resolved_info.push_back(resolved);
-
-    // build the pipeline
-    resolve_pipeline_t *pipeline = resolve_pipeline_t::Create();
-    auto resolve_step = Resolve(ioservice, host, port, 
std::back_inserter(pipeline->state()));
-    pipeline->Push(resolve_step);
-
-    // make a status associated with current pipeline
-    std::shared_ptr<std::promise<Status>> active_stat = 
std::make_shared<std::promise<Status>>();
-    pipelines.push_back(std::make_pair(pipeline, active_stat));
-
-    pipeline->Run([i,active_stat, &resolved_info](const Status &s, const 
endpoint_vector &ends){
-      resolved_info[i].endpoints = ends;
-      active_stat->set_value(s);
-    });
-
+    resolvers.emplace_back(new ScopedResolver(ioservice, host, port));
+    resolvers[i]->BeginAsyncResolve();
   }
 
   // Join all async operations
-  std::vector<ResolvedNamenodeInfo> return_set;
-  for(unsigned int i=0; i<pipelines.size();i++) {
-    std::shared_ptr<std::promise<Status>> promise = pipelines[i].second;
-
-    std::future<Status> future = promise->get_future();
-    Status stat = future.get();
-
-    // Clear endpoints if we hit an error
-    if(!stat.ok()) {
-      LOG_WARN(kRPC, << "Unable to resolve endpoints for " << 
nodes[i].uri.str());
-      resolved_info[i].endpoints.clear();
+  for(unsigned int i=0; i < resolvers.size(); i++) {
+    Status asyncReturnStatus = resolvers[i]->Join();
+
+    ResolvedNamenodeInfo info;
+    info = nodes[i];
+
+    if(asyncReturnStatus.ok()) {
+      // Copy out endpoints if things went well
+      info.endpoints = resolvers[i]->GetEndpoints();
+    } else {
+      LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << 
nodes[i].get_host()
+                                                               << " port: " << 
nodes[i].get_port());
     }
-  }
 
+    resolved_info.push_back(info);
+  }
   return resolved_info;
 }
 
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
index 7f0e572..09d8188 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -19,6 +19,7 @@
 #define LIB_COMMON_UTIL_H_
 
 #include "hdfspp/status.h"
+#include "common/logging.h"
 
 #include <sstream>
 #include <mutex>
@@ -112,6 +113,73 @@ inline asio::ip::tcp::socket 
*get_asio_socket_ptr<asio::ip::tcp::socket>
 
 //Check if the high bit is set
 bool IsHighBitSet(uint64_t num);
+
+
+// Provide a way to do an atomic swap on a callback.
+// SetCallback, AtomicSwapCallback, and GetCallback can only be called once 
each.
+// AtomicSwapCallback and GetCallback must only be called after SetCallback.
+//
+// We can't throw on error, and since the callback is templated it's tricky to
+// generate generic dummy callbacks.  Complain loudly in the log and get good
+// test coverage.  It shouldn't be too hard to avoid invalid states.
+template <typename CallbackType>
+class SwappableCallbackHolder {
+ private:
+  std::mutex state_lock_;
+  CallbackType callback_;
+  bool callback_set_      = false;
+  bool callback_swapped_  = false;
+  bool callback_accessed_ = false;
+ public:
+  bool IsCallbackSet() {
+    mutex_guard swap_lock(state_lock_);
+    return callback_set_;
+  }
+
+  bool IsCallbackAccessed() {
+    mutex_guard swap_lock(state_lock_);
+    return callback_accessed_;
+  }
+
+  bool SetCallback(const CallbackType& callback) {
+    mutex_guard swap_lock(state_lock_);
+    if(callback_set_ || callback_swapped_ || callback_accessed_) {
+      LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
+      return false;
+    }
+    callback_ = callback;
+    callback_set_ = true;
+    return true;
+  }
+
+  CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& 
swapped) {
+    mutex_guard swap_lock(state_lock_);
+    if(!callback_set_ || callback_swapped_) {
+      LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access 
invariants.")
+      swapped = false;
+    } else if (callback_accessed_) {
+      // Common case where callback has been invoked but caller may not know
+      LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback 
has been accessed");
+      return false;
+    }
+
+    CallbackType old = callback_;
+    callback_ = replacement;
+    callback_swapped_ = true;
+    swapped = true;
+    return old;
+  }
+  CallbackType GetCallback() {
+    mutex_guard swap_lock(state_lock_);
+    if(!callback_set_ || callback_accessed_) {
+      LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
+    }
+    callback_accessed_ = true;
+    return callback_;
+  }
+};
+
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index a5f3aad..b46102a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -202,6 +202,7 @@ void FileSystemImpl::Connect(const std::string &server,
   LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
                         << ", server=" << server << ", service="
                         << service << ") called");
+  connect_callback_.SetCallback(handler);
 
   /* IoService::New can return nullptr */
   if (!io_service_) {
@@ -236,8 +237,8 @@ void FileSystemImpl::Connect(const std::string &server,
   }
 
 
-  nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this, 
handler](const Status & s) {
-    handler(s, this);
+  nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, 
[this](const Status & s) {
+    connect_callback_.GetCallback()(s, this);
   });
 }
 
@@ -286,6 +287,43 @@ int FileSystemImpl::WorkerThreadCount() {
   }
 }
 
+bool FileSystemImpl::CancelPendingConnect() {
+  if(!connect_callback_.IsCallbackSet()) {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << 
"::CancelPendingConnect called before Connect started");
+    return false;
+  }
+  if(connect_callback_.IsCallbackAccessed()) {
+    LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << 
"::CancelPendingConnect called after Connect completed");
+    return false;
+  }
+
+  // First invoke callback, then do proper teardown in RpcEngine and 
RpcConnection
+  ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) {
+    LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled 
FileSystem@" << fs << "::Connect with status: " << stat.ToString());
+  };
+
+  bool callback_swapped = false;
+  ConnectCallback original_callback = 
connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped);
+
+  if(callback_swapped) {
+    // Take original callback and invoke it as if it was canceled.
+    LOG_DEBUG(kFileSystem, << "Swapped in dummy callback.  Invoking connect 
callback with canceled status.");
+    std::function<void(void)> wrapped_callback = [original_callback, this](){
+      // handling code expected to check status before dereferenceing 'this'
+      original_callback(Status::Canceled(), this);
+    };
+    io_service_->PostTask(wrapped_callback);
+  } else {
+    LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect.  It hasn't 
been invoked yet or may have already completed.")
+    return false;
+  }
+
+  // Now push cancel down to clean up where possible and make sure the 
RpcEngine
+  // won't try to do retries in the background.  The rest of the memory cleanup
+  // happens when this FileSystem is deleted by the user.
+  return nn_.CancelPendingConnect();
+}
+
 void FileSystemImpl::Open(
     const std::string &path,
     const std::function<void(const Status &, FileHandle *)> &handler) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 80978cf..fbc3967 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -46,6 +46,8 @@ namespace hdfs {
 class FileSystemImpl : public FileSystem {
 public:
   MEMCHECKED_CLASS(FileSystemImpl)
+  typedef std::function<void(const Status &, FileSystem *)> ConnectCallback;
+
   explicit FileSystemImpl(IoService *&io_service, const std::string& 
user_name, const Options &options);
   explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& 
user_name, const Options &options);
   ~FileSystemImpl() override;
@@ -61,6 +63,9 @@ public:
       const std::function<void(const Status &, FileSystem *)> &handler) 
override;
   virtual Status ConnectToDefaultFs() override;
 
+  /* Cancel connection if FS is in the middle of one */
+  virtual bool CancelPendingConnect() override;
+
   virtual void Open(const std::string &path,
                     const std::function<void(const Status &, FileHandle *)>
                         &handler) override;
@@ -197,6 +202,9 @@ private:
   NameNodeOperations nn_;
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 
+  // Keep connect callback around in case it needs to be canceled
+  SwappableCallbackHolder<ConnectCallback> connect_callback_;
+
   /**
    * Runtime event monitoring handlers.
    * Note:  This is really handy to have for advanced usage but

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
index 89acac3..9e2d90a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
@@ -45,6 +45,10 @@ void NameNodeOperations::Connect(const std::string 
&cluster_name,
   engine_.Connect(cluster_name, servers, handler);
 }
 
+bool NameNodeOperations::CancelPendingConnect() {
+  return engine_.CancelPendingConnect();
+}
+
 void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t 
offset, uint64_t length,
   std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> 
handler)
 {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
index 60efacc..59b3512 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
@@ -52,6 +52,8 @@ public:
                const std::vector<ResolvedNamenodeInfo> &servers,
                std::function<void(const Status &)> &&handler);
 
+  bool CancelPendingConnect();
+
   void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t 
length,
     std::function<void(const Status &, std::shared_ptr<const struct 
FileInfo>)> handler);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 7c280b8..651225a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -150,7 +150,8 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const 
Options &options,
       protocol_version_(protocol_version),
       call_id_(0),
       retry_timer(*io_service),
-      event_handlers_(std::make_shared<LibhdfsEvents>())
+      event_handlers_(std::make_shared<LibhdfsEvents>()),
+      connect_canceled_(false)
 {
   LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
 
@@ -182,6 +183,16 @@ void RpcEngine::Connect(const std::string &cluster_name,
   conn_->Connect(last_endpoints_, auth_info_, handler);
 }
 
+bool RpcEngine::CancelPendingConnect() {
+  if(connect_canceled_) {
+    LOG_DEBUG(kRPC, << "RpcEngine@" << this << "::CancelPendingConnect called 
more than once");
+    return false;
+  }
+
+  connect_canceled_ = true;
+  return true;
+}
+
 void RpcEngine::Shutdown() {
   LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
   io_service_->post([this]() {
@@ -250,6 +261,14 @@ void RpcEngine::AsyncRpc(
 
   LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
 
+  // In case user-side code isn't checking the status of Connect before doing 
RPC
+  if(connect_canceled_) {
+    io_service_->post(
+        [handler](){ handler(Status::Canceled()); }
+    );
+    return;
+  }
+
   if (!conn_) {
     conn_ = InitializeConnection();
     conn_->ConnectAndFlush(last_endpoints_);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 9191ab2..b4aef00 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -353,6 +353,8 @@ class RpcEngine : public LockFreeRpcEngine {
                const std::vector<ResolvedNamenodeInfo> servers,
                RpcCallback &handler);
 
+  bool CancelPendingConnect();
+
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
                 const std::shared_ptr<::google::protobuf::MessageLite> &resp,
@@ -418,6 +420,9 @@ private:
 
   std::mutex engine_state_lock_;
 
+  // Once Connect has been canceled there is no going back
+  bool connect_canceled_;
+
   // Keep endpoint info for all HA connections, a non-null ptr indicates
   // that HA info was found in the configuation.
   std::unique_ptr<HANamenodeTracker> ha_persisted_info_;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to