HDFS-11028: libhdfs++: FileSystem needs to be able to cancel pending connections. Contributed by James Clampffer
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/872ca881 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/872ca881 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/872ca881 Branch: refs/heads/HDFS-12996 Commit: 872ca88101473426a24c9db81c36926059e4e483 Parents: 996f140 Author: James <j...@apache.org> Authored: Mon Jan 23 18:19:13 2017 -0500 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Mon Mar 26 11:11:03 2018 -0700 ---------------------------------------------------------------------- .../native/libhdfspp/examples/c/CMakeLists.txt | 1 + .../examples/c/connect_cancel/CMakeLists.txt | 35 ++++ .../examples/c/connect_cancel/connect_cancel.c | 107 +++++++++++++ .../libhdfspp/examples/cpp/CMakeLists.txt | 1 + .../examples/cpp/connect_cancel/CMakeLists.txt | 29 ++++ .../cpp/connect_cancel/connect_cancel.cc | 158 +++++++++++++++++++ .../native/libhdfspp/include/hdfspp/hdfs_ext.h | 31 ++++ .../native/libhdfspp/include/hdfspp/hdfspp.h | 6 + .../native/libhdfspp/lib/bindings/c/hdfs.cc | 108 +++++++++++++ .../libhdfspp/lib/common/continuation/asio.h | 86 ---------- .../lib/common/continuation/continuation.h | 12 -- .../libhdfspp/lib/common/hdfs_configuration.cc | 25 ++- .../libhdfspp/lib/common/namenode_info.cc | 123 ++++++++++----- .../src/main/native/libhdfspp/lib/common/util.h | 68 ++++++++ .../main/native/libhdfspp/lib/fs/filesystem.cc | 42 ++++- .../main/native/libhdfspp/lib/fs/filesystem.h | 8 + .../libhdfspp/lib/fs/namenode_operations.cc | 4 + .../libhdfspp/lib/fs/namenode_operations.h | 2 + .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 21 ++- .../main/native/libhdfspp/lib/rpc/rpc_engine.h | 5 + 20 files changed, 729 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt index 76880cd..a73d2bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt @@ -17,3 +17,4 @@ # add_subdirectory(cat) +add_subdirectory(connect_cancel) http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt new file mode 100644 index 0000000..56aeeea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_executable(connect_cancel_c connect_cancel.c) +target_link_libraries(connect_cancel_c hdfspp uriparser2) + +# Several examples in different languages need to produce executables with +# same names. To allow executables with same names we keep their CMake +# names different, but specify their executable names as follows: +set_target_properties( connect_cancel_c + PROPERTIES + OUTPUT_NAME "connect_cancel_c" +) http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c new file mode 100644 index 0000000..f6af6d1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/connect_cancel/connect_cancel.c @@ -0,0 +1,107 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +/* + Attempt to connect to a cluster and use Control-C to bail out if it takes a while. + Valid config must be in environment variable $HADOOP_CONF_DIR +*/ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> + +#include "hdfspp/hdfs_ext.h" +#include "common/util_c.h" + +#define ERROR_BUFFER_SIZE 1024 + +// Global so signal handler can get at it +hdfsFS fs = NULL; + +const char *catch_enter = "In signal handler, going to try and cancel.\n"; +const char *catch_cancel = "hdfsCancelPendingConnect has been canceled in the signal handler.\n"; +const char *catch_exit = "Exiting the signal handler.\n"; + +// Print to stdout without calling malloc or otherwise indirectly modify userspace state. +// Write calls to stdout may still interleave with stuff coming from elsewhere. +static void sighandler_direct_stdout(const char *msg) { + if(!msg) + return; + ssize_t res = write(1 /*posix stdout fd*/, msg, strlen(msg)); + (void)res; +} + +static void sig_catch(int val) { + // Beware of calling things that aren't reentrant e.g. malloc while in a signal handler. + sighandler_direct_stdout(catch_enter); + + if(fs) { + hdfsCancelPendingConnection(fs); + sighandler_direct_stdout(catch_cancel); + } + sighandler_direct_stdout(catch_exit); +} + + +int main(int argc, char** argv) { + hdfsSetLoggingLevel(HDFSPP_LOG_LEVEL_INFO); + signal(SIGINT, sig_catch); + + char error_text[ERROR_BUFFER_SIZE]; + if (argc != 1) { + fprintf(stderr, "usage: ./connect_cancel_c\n"); + ShutdownProtobufLibrary_C(); + exit(EXIT_FAILURE); + } + + const char *hdfsconfdir = getenv("HADOOP_CONF_DIR"); + if(!hdfsconfdir) { + fprintf(stderr, "$HADOOP_CONF_DIR must be set\n"); + ShutdownProtobufLibrary_C(); + exit(EXIT_FAILURE); + } + + struct hdfsBuilder* builder = hdfsNewBuilderFromDirectory(hdfsconfdir); + + fs = hdfsAllocateFileSystem(builder); + if (fs == NULL) { + hdfsGetLastError(error_text, ERROR_BUFFER_SIZE); + fprintf(stderr, "hdfsAllocateFileSystem returned null.\n%s\n", error_text); + hdfsFreeBuilder(builder); + ShutdownProtobufLibrary_C(); + exit(EXIT_FAILURE); + } + + int connected = hdfsConnectAllocated(fs, builder); + if (connected != 0) { + hdfsGetLastError(error_text, ERROR_BUFFER_SIZE); + fprintf(stderr, "hdfsConnectAllocated errored.\n%s\n", error_text); + hdfsFreeBuilder(builder); + ShutdownProtobufLibrary_C(); + exit(EXIT_FAILURE); + } + + hdfsDisconnect(fs); + hdfsFreeBuilder(builder); + // Clean up static data and prevent valgrind memory leaks + ShutdownProtobufLibrary_C(); + return 0; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt index 9e16b0b..8893f03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt @@ -19,3 +19,4 @@ add_subdirectory(cat) add_subdirectory(gendirs) add_subdirectory(find) +add_subdirectory(connect_cancel) http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt new file mode 100644 index 0000000..a029a99 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/CMakeLists.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_executable(connect_cancel connect_cancel.cc) +target_link_libraries(connect_cancel hdfspp) + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc new file mode 100644 index 0000000..7e5daa9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/connect_cancel/connect_cancel.cc @@ -0,0 +1,158 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + + +#include "hdfspp/hdfspp.h" +#include "common/hdfs_configuration.h" +#include "common/configuration_loader.h" + +#include <google/protobuf/stubs/common.h> + +#include <signal.h> +#include <unistd.h> + +#include <thread> +#include <iostream> + +// Simple example of how to cancel an async connect call. +// Here Control-C (SIGINT) is caught in order to invoke the FS level cancel and +// properly tear down the process. Valgrind should show no leaked memory on exit +// when cancel has been called. URI parsing code is omitted and defaultFs from +// /etc/hadoop/conf or $HADOOP_CONF_DIR is always used. + +// Scoped globally to make it simple to reference from the signal handler. +std::shared_ptr<hdfs::FileSystem> fs; + +const std::string catch_enter("In signal handler, going to try and cancel FileSystem::Connect.\n"); +const std::string catch_cancel("FileSystem::Cancel has been canceled in the signal handler.\n"); +const std::string catch_exit("Exiting the signal handler.\n"); + +// Avoid IO reentrancy issues, see comments in signal handler below. +// It's possible that the write interleaves with another write call, +// but it won't corrupt the stack or heap. +static void sighandler_direct_stdout(const std::string &msg) { + ssize_t res = ::write(1 /*posix stdout FD*/, msg.data(), msg.size()); + // In production you'd want to check res, but error handling code will + // need to be fairly application specific if it's going to properly + // avoid reentrant calls to malloc. + (void)res; +} + +// Signal handler to make a SIGINT call cancel rather than exit(). +static void sig_catch(int val) { + (void)val; + // This is avoiding the tricky bits of signal handling, notably that the + // underlying string manipulation and IO functions used by the the logger + // are unlikely to be reentrant. + // + // Production code could mask out all logging on handler entry and enable + // it again on exit; here we just assume it's "good enough" and some + // (possibly broken) log messages are better than none. + + sighandler_direct_stdout(catch_enter); + if(fs) { + // This will invoke the callback immediately with an OperationCanceled status + fs->CancelPendingConnect(); + sighandler_direct_stdout(catch_cancel); + } + sighandler_direct_stdout(catch_exit); +} + + +int main(int arg_token_count, const char **args) { + (void)args; + if(arg_token_count != 1) { + std::cerr << "usage: ./connect_cancel"; + google::protobuf::ShutdownProtobufLibrary(); + exit(EXIT_FAILURE); + } + + // Register signal handle to asynchronously invoke cancel from outside the main thread. + signal(SIGINT, sig_catch); + + // Generic setup/config code much like the other examples. + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. + if(config){ + //Loading options from the config + options = config->GetOptions(); + } + + + // Start an IoService and some worker threads + std::shared_ptr<hdfs::IoService> service = hdfs::IoService::MakeShared(); + if(nullptr == service) { + std::cerr << "Unable to create IoService" << std::endl; + fs.reset(); + // Nasty hack to clean up for valgrind since we don't have the C++17 optional<T>::reset method + config = decltype(config)(); + google::protobuf::ShutdownProtobufLibrary(); + exit(EXIT_FAILURE); + } + + unsigned int worker_count = service->InitDefaultWorkers(); + if(worker_count < 1) { + std::cerr << "Unable to create IoService worker threads"; + fs.reset(); + service->Stop(); + config = decltype(config)(); + google::protobuf::ShutdownProtobufLibrary(); + exit(EXIT_FAILURE); + } + + // Set up and connect to the FileSystem + fs.reset(hdfs::FileSystem::New(service, "", options)); + if(nullptr == fs) { + std::cerr << "Unable to create FileSystem" << std::endl; + fs.reset(); + service->Stop(); + config = decltype(config)(); + google::protobuf::ShutdownProtobufLibrary(); + exit(EXIT_FAILURE); + } + + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + fs.reset(); + service->Stop(); + config = decltype(config)(); + google::protobuf::ShutdownProtobufLibrary(); + exit(EXIT_FAILURE); + } + + fs.reset(); + service->Stop(); + config = decltype(config)(); + google::protobuf::ShutdownProtobufLibrary(); + + return 0; +} + + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 72434e6..2b15cf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -299,6 +299,7 @@ int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cook * objects; NULL on error or empty result. * errno is set to non-zero on error or zero on success. **/ +LIBHDFS_EXTERNAL hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries); @@ -314,6 +315,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * @param name Name to be given to the created snapshot (may be NULL) * @return 0 on success, corresponding errno on failure **/ +LIBHDFS_EXTERNAL int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name); /** @@ -324,6 +326,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name); * @param name Name of the snapshot to be deleted (must be non-blank) * @return 0 on success, corresponding errno on failure **/ +LIBHDFS_EXTERNAL int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name); /** @@ -333,6 +336,7 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name); * @param path Path to the directory to be made snapshottable (must be non-blank) * @return 0 on success, corresponding errno on failure **/ +LIBHDFS_EXTERNAL int hdfsAllowSnapshot(hdfsFS fs, const char* path); /** @@ -342,8 +346,35 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path); * @param path Path to the directory to be made non-snapshottable (must be non-blank) * @return 0 on success, corresponding errno on failure **/ +LIBHDFS_EXTERNAL int hdfsDisallowSnapshot(hdfsFS fs, const char* path); +/** + * Create a FileSystem based on the builder but don't connect + * @param bld Used to populate config options in the same manner as hdfsBuilderConnect. + * Does not free builder. + **/ +LIBHDFS_EXTERNAL +hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld); + +/** + * Connect a FileSystem created with hdfsAllocateFileSystem + * @param fs A disconnected FS created with hdfsAllocateFileSystem + * @param bld The same or exact copy of the builder used for Allocate, we still need a few fields. + * Does not free builder. + * @return 0 on success, corresponding errno on failure + **/ +LIBHDFS_EXTERNAL +int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld); + +/** + * Cancel a pending connection on a FileSystem + * @param fs A fs in the process of connecting using hdfsConnectAllocated in another thread. + * @return 0 on success, corresponding errno on failure + **/ +LIBHDFS_EXTERNAL +int hdfsCancelPendingConnection(hdfsFS fs); + #ifdef __cplusplus } /* end extern "C" */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 5fbd3d8..46a1e61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -217,6 +217,12 @@ class FileSystem { virtual Status ConnectToDefaultFs() = 0; /** + * Cancels any attempts to connect to the HDFS cluster. + * FileSystem is expected to be destroyed after invoking this. + */ + virtual bool CancelPendingConnect() = 0; + + /** * Open a file on HDFS. The call issues an RPC to the NameNode to * gather the locations of all blocks in the file and to return a * new instance of the @ref InputStream object. http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index dd7d00c..0608244 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -332,6 +332,94 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st } } +hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) { + // Same idea as the first half of doHdfsConnect, but return the wrapped FS before + // connecting. + try { + errno = 0; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); + + int io_thread_count = bld->config.GetOptions().io_threads_; + if(io_thread_count < 1) { + io_service->InitDefaultWorkers(); + } else { + io_service->InitWorkers(io_thread_count); + } + + FileSystem *fs = FileSystem::New(io_service, bld->user.value_or(""), bld->config.GetOptions()); + if (!fs) { + ReportError(ENODEV, "Could not create FileSystem object"); + return nullptr; + } + + if (fsEventCallback) { + fs->SetFsEventCallback(fsEventCallback.value()); + } + + return new hdfs_internal(fs); + } catch (const std::exception &e) { + ReportException(e); + return nullptr; + } catch (...) { + ReportCaughtNonException(); + return nullptr; + } + return nullptr; +} + +int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) { + if(!CheckSystem(fs)) { + return ENODEV; + } + + if(!bld) { + ReportError(ENODEV, "No hdfsBuilder object supplied"); + return ENODEV; + } + + // Get C++ FS to do connect + FileSystem *fsImpl = fs->get_impl(); + if(!fsImpl) { + ReportError(ENODEV, "Null FileSystem implementation"); + return ENODEV; + } + + // Unpack the required bits of the hdfsBuilder + optional<std::string> nn = bld->overrideHost; + optional<tPort> port = bld->overridePort; + optional<std::string> user = bld->user; + + // try-catch in case some of the third-party stuff throws + try { + Status status; + if (nn || port) { + if (!port) { + port = kDefaultPort; + } + std::string port_as_string = std::to_string(*port); + status = fsImpl->Connect(nn.value_or(""), port_as_string); + } else { + status = fsImpl->ConnectToDefaultFs(); + } + + if (!status.ok()) { + Error(status); + return ENODEV; + } + + // 0 to indicate a good connection + return 0; + } catch (const std::exception & e) { + ReportException(e); + return ENODEV; + } catch (...) { + ReportCaughtNonException(); + return ENODEV; + } + + return 0; +} + hdfsFS hdfsConnect(const char *nn, tPort port) { return hdfsConnectAsUser(nn, port, ""); } @@ -350,6 +438,26 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { return hdfsConnectAsUser(nn, port, ""); } +int hdfsCancelPendingConnection(hdfsFS fs) { + // todo: stick an enum in hdfs_internal to check the connect state + if(!CheckSystem(fs)) { + return ENODEV; + } + + FileSystem *fsImpl = fs->get_impl(); + if(!fsImpl) { + ReportError(ENODEV, "Null FileSystem implementation"); + return ENODEV; + } + + bool canceled = fsImpl->CancelPendingConnect(); + if(canceled) { + return 0; + } else { + return EINTR; + } +} + int hdfsDisconnect(hdfsFS fs) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index 3f650ce..193358f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -34,24 +34,6 @@ namespace asio_continuation { using namespace continuation; -template <class Stream, class MutableBufferSequence> -class ReadContinuation : public Continuation { -public: - ReadContinuation(std::shared_ptr<Stream>& stream, const MutableBufferSequence &buffer) - : stream_(stream), buffer_(buffer) {} - virtual void Run(const Next &next) override { - auto handler = - [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; - asio::async_read(*stream_, buffer_, handler); - } - -private: - // prevent construction from raw ptr - ReadContinuation(Stream *stream, MutableBufferSequence &buffer); - std::shared_ptr<Stream> stream_; - MutableBufferSequence buffer_; -}; - template <class Stream, class ConstBufferSequence> class WriteContinuation : public Continuation { public: @@ -71,80 +53,12 @@ private: ConstBufferSequence buffer_; }; -template <class Socket, class Iterator> -class ConnectContinuation : public Continuation { -public: - ConnectContinuation(Socket *socket, Iterator begin, Iterator end, - Iterator *connected_endpoint) - : socket_(socket), begin_(begin), end_(end), - connected_endpoint_(connected_endpoint) {} - - virtual void Run(const Next &next) override { - auto handler = [this, next](const asio::error_code &ec, Iterator it) { - if (connected_endpoint_) { - *connected_endpoint_ = it; - } - next(ToStatus(ec)); - }; - asio::async_connect(*socket_, begin_, end_, handler); - } - -private: - Socket *socket_; - Iterator begin_; - Iterator end_; - Iterator *connected_endpoint_; -}; - -template <class OutputIterator> -class ResolveContinuation : public Continuation { -public: - ResolveContinuation(::asio::io_service *io_service, const std::string &server, - const std::string &service, OutputIterator result) - : resolver_(*io_service), query_(server, service), result_(result) {} - - virtual void Run(const Next &next) override { - using resolver = ::asio::ip::tcp::resolver; - auto handler = - [this, next](const asio::error_code &ec, resolver::iterator it) { - if (!ec) { - std::copy(it, resolver::iterator(), result_); - } - next(ToStatus(ec)); - }; - resolver_.async_resolve(query_, handler); - } - -private: - ::asio::ip::tcp::resolver resolver_; - ::asio::ip::tcp::resolver::query query_; - OutputIterator result_; -}; - template <class Stream, class ConstBufferSequence> static inline Continuation *Write(std::shared_ptr<Stream> stream, const ConstBufferSequence &buffer) { return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer); } -template <class Stream, class MutableBufferSequence> -static inline Continuation *Read(std::shared_ptr<Stream> stream, - const MutableBufferSequence &buffer) { - return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer); -} - -template <class Socket, class Iterator> -static inline Continuation *Connect(Socket *socket, Iterator begin, - Iterator end) { - return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr); -} - -template <class OutputIterator> -static inline Continuation * -Resolve(::asio::io_service *io_service, const std::string &server, - const std::string &service, OutputIterator result) { - return new ResolveContinuation<OutputIterator>(io_service, server, service, result); -} } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h index 4c9b8ad..18aa146 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -131,18 +131,6 @@ template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) { Schedule(Status::OK()); } -template <class Handler> class BindContinuation : public Continuation { -public: - BindContinuation(const Handler &handler) : handler_(handler) {} - virtual void Run(const Next &next) override { handler_(next); } - -private: - Handler handler_; -}; - -template <class Handler> static inline Continuation *Bind(const Handler &handler) { - return new BindContinuation<Handler>(handler); -} } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc index 6778bad..ab87506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc @@ -71,6 +71,17 @@ std::vector<std::string> SplitOnComma(const std::string &s, bool include_empty_s return res; } +std::string RemoveSpaces(const std::string &str) { + std::string res; + for(unsigned int i=0; i<str.size(); i++) { + char curr = str[i]; + if(curr != ' ') { + res += curr; + } + } + return res; +} + // Prepend hdfs:// to string if there isn't already a scheme // Converts unset optional into empty string std::string PrependHdfsScheme(optional<std::string> str) { @@ -92,6 +103,8 @@ struct ha_parse_error : public std::exception { }; std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string &nameservice) { + LOG_TRACE(kRPC, << "HDFSConfiguration@" << this << "::LookupNameService( nameservice=" << nameservice<< " ) called"); + std::vector<NamenodeInfo> namenodes; try { // Find namenodes that belong to nameservice @@ -104,8 +117,10 @@ std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string else throw ha_parse_error("unable to find " + service_nodes); - for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++) - LOG_INFO(kRPC, << "Namenode: " << *it); + for(unsigned int i=0; i<namenode_ids.size(); i++) { + namenode_ids[i] = RemoveSpaces(namenode_ids[i]); + LOG_INFO(kRPC, << "Namenode: " << namenode_ids[i]); + } } // should this error if we only find 1 NN? @@ -123,7 +138,11 @@ std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string } URI uri = node_uri.value(); - LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); + if(uri.str() == "") { + LOG_WARN(kRPC, << "Attempted to read info for nameservice " << nameservice << " node " << dom_node_name << " but didn't find anything.") + } else { + LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); + } NamenodeInfo node(nameservice, *node_id, uri); namenodes.push_back(node); http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc index bc38be7..bd43091 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc @@ -18,7 +18,7 @@ #include "namenode_info.h" -#include "common/continuation/asio.h" +#include "common/util.h" #include "common/logging.h" #include <sstream> @@ -71,62 +71,107 @@ bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) { return true; } +typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector; + +// RAII wrapper +class ScopedResolver { + private: + ::asio::io_service *io_service_; + std::string host_; + std::string port_; + ::asio::ip::tcp::resolver::query query_; + ::asio::ip::tcp::resolver resolver_; + endpoint_vector endpoints_; + + // Caller blocks on access if resolution isn't finished + std::shared_ptr<std::promise<Status>> result_status_; + public: + ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) : + io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_) + { + if(!io_service_) + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service"); + } -std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { - using namespace asio_continuation; + ~ScopedResolver() { + resolver_.cancel(); + } - typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector; - typedef Pipeline<endpoint_vector> resolve_pipeline_t; + bool BeginAsyncResolve() { + // result_status_ would only exist if this was previously called. Invalid state. + if(result_status_) { + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: may only be called once per instance"); + return false; + } else if(!io_service_) { + LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: null io_service"); + return false; + } + // Now set up the promise, set it in async_resolve's callback + result_status_ = std::make_shared<std::promise<Status>>(); + + // Callback to pull a copy of endpoints out of resolver and set promise + auto callback = [this](const asio::error_code &ec, ::asio::ip::tcp::resolver::iterator out) { + if(!ec) { + std::copy(out, ::asio::ip::tcp::resolver::iterator(), std::back_inserter(endpoints_)); + } + result_status_->set_value( ToStatus(ec) ); + }; + resolver_.async_resolve(query_, callback); + return true; + } - std::vector<std::pair<resolve_pipeline_t*, std::shared_ptr<std::promise<Status>>>> pipelines; - pipelines.reserve(nodes.size()); + Status Join() { + if(!result_status_) { + std::ostringstream errmsg; + errmsg << "ScopedResolver@" << this << "Join invalid call: promise never set"; + return Status::InvalidArgument(errmsg.str().c_str()); + } + + std::future<Status> future_result = result_status_->get_future(); + Status res = future_result.get(); + return res; + } + + endpoint_vector GetEndpoints() { + // Explicitly return by value to decouple lifecycles. + return endpoints_; + } +}; + +std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { + std::vector< std::unique_ptr<ScopedResolver> > resolvers; + resolvers.reserve(nodes.size()); std::vector<ResolvedNamenodeInfo> resolved_info; - // This must never reallocate once async ops begin resolved_info.reserve(nodes.size()); for(unsigned int i=0; i<nodes.size(); i++) { std::string host = nodes[i].get_host(); std::string port = nodes[i].get_port(); - ResolvedNamenodeInfo resolved; - resolved = nodes[i]; - resolved_info.push_back(resolved); - - // build the pipeline - resolve_pipeline_t *pipeline = resolve_pipeline_t::Create(); - auto resolve_step = Resolve(ioservice, host, port, std::back_inserter(pipeline->state())); - pipeline->Push(resolve_step); - - // make a status associated with current pipeline - std::shared_ptr<std::promise<Status>> active_stat = std::make_shared<std::promise<Status>>(); - pipelines.push_back(std::make_pair(pipeline, active_stat)); - - pipeline->Run([i,active_stat, &resolved_info](const Status &s, const endpoint_vector &ends){ - resolved_info[i].endpoints = ends; - active_stat->set_value(s); - }); - + resolvers.emplace_back(new ScopedResolver(ioservice, host, port)); + resolvers[i]->BeginAsyncResolve(); } // Join all async operations - std::vector<ResolvedNamenodeInfo> return_set; - for(unsigned int i=0; i<pipelines.size();i++) { - std::shared_ptr<std::promise<Status>> promise = pipelines[i].second; - - std::future<Status> future = promise->get_future(); - Status stat = future.get(); - - // Clear endpoints if we hit an error - if(!stat.ok()) { - LOG_WARN(kRPC, << "Unable to resolve endpoints for " << nodes[i].uri.str()); - resolved_info[i].endpoints.clear(); + for(unsigned int i=0; i < resolvers.size(); i++) { + Status asyncReturnStatus = resolvers[i]->Join(); + + ResolvedNamenodeInfo info; + info = nodes[i]; + + if(asyncReturnStatus.ok()) { + // Copy out endpoints if things went well + info.endpoints = resolvers[i]->GetEndpoints(); + } else { + LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << nodes[i].get_host() + << " port: " << nodes[i].get_port()); } - } + resolved_info.push_back(info); + } return resolved_info; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index 7f0e572..09d8188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -19,6 +19,7 @@ #define LIB_COMMON_UTIL_H_ #include "hdfspp/status.h" +#include "common/logging.h" #include <sstream> #include <mutex> @@ -112,6 +113,73 @@ inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket> //Check if the high bit is set bool IsHighBitSet(uint64_t num); + + +// Provide a way to do an atomic swap on a callback. +// SetCallback, AtomicSwapCallback, and GetCallback can only be called once each. +// AtomicSwapCallback and GetCallback must only be called after SetCallback. +// +// We can't throw on error, and since the callback is templated it's tricky to +// generate generic dummy callbacks. Complain loudly in the log and get good +// test coverage. It shouldn't be too hard to avoid invalid states. +template <typename CallbackType> +class SwappableCallbackHolder { + private: + std::mutex state_lock_; + CallbackType callback_; + bool callback_set_ = false; + bool callback_swapped_ = false; + bool callback_accessed_ = false; + public: + bool IsCallbackSet() { + mutex_guard swap_lock(state_lock_); + return callback_set_; + } + + bool IsCallbackAccessed() { + mutex_guard swap_lock(state_lock_); + return callback_accessed_; + } + + bool SetCallback(const CallbackType& callback) { + mutex_guard swap_lock(state_lock_); + if(callback_set_ || callback_swapped_ || callback_accessed_) { + LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.") + return false; + } + callback_ = callback; + callback_set_ = true; + return true; + } + + CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) { + mutex_guard swap_lock(state_lock_); + if(!callback_set_ || callback_swapped_) { + LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.") + swapped = false; + } else if (callback_accessed_) { + // Common case where callback has been invoked but caller may not know + LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed"); + return false; + } + + CallbackType old = callback_; + callback_ = replacement; + callback_swapped_ = true; + swapped = true; + return old; + } + CallbackType GetCallback() { + mutex_guard swap_lock(state_lock_); + if(!callback_set_ || callback_accessed_) { + LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.") + } + callback_accessed_ = true; + return callback_; + } +}; + + } #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index a5f3aad..b46102a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -202,6 +202,7 @@ void FileSystemImpl::Connect(const std::string &server, LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR << ", server=" << server << ", service=" << service << ") called"); + connect_callback_.SetCallback(handler); /* IoService::New can return nullptr */ if (!io_service_) { @@ -236,8 +237,8 @@ void FileSystemImpl::Connect(const std::string &server, } - nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this, handler](const Status & s) { - handler(s, this); + nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this](const Status & s) { + connect_callback_.GetCallback()(s, this); }); } @@ -286,6 +287,43 @@ int FileSystemImpl::WorkerThreadCount() { } } +bool FileSystemImpl::CancelPendingConnect() { + if(!connect_callback_.IsCallbackSet()) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started"); + return false; + } + if(connect_callback_.IsCallbackAccessed()) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed"); + return false; + } + + // First invoke callback, then do proper teardown in RpcEngine and RpcConnection + ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) { + LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString()); + }; + + bool callback_swapped = false; + ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped); + + if(callback_swapped) { + // Take original callback and invoke it as if it was canceled. + LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status."); + std::function<void(void)> wrapped_callback = [original_callback, this](){ + // handling code expected to check status before dereferenceing 'this' + original_callback(Status::Canceled(), this); + }; + io_service_->PostTask(wrapped_callback); + } else { + LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.") + return false; + } + + // Now push cancel down to clean up where possible and make sure the RpcEngine + // won't try to do retries in the background. The rest of the memory cleanup + // happens when this FileSystem is deleted by the user. + return nn_.CancelPendingConnect(); +} + void FileSystemImpl::Open( const std::string &path, const std::function<void(const Status &, FileHandle *)> &handler) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 80978cf..fbc3967 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -46,6 +46,8 @@ namespace hdfs { class FileSystemImpl : public FileSystem { public: MEMCHECKED_CLASS(FileSystemImpl) + typedef std::function<void(const Status &, FileSystem *)> ConnectCallback; + explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); ~FileSystemImpl() override; @@ -61,6 +63,9 @@ public: const std::function<void(const Status &, FileSystem *)> &handler) override; virtual Status ConnectToDefaultFs() override; + /* Cancel connection if FS is in the middle of one */ + virtual bool CancelPendingConnect() override; + virtual void Open(const std::string &path, const std::function<void(const Status &, FileHandle *)> &handler) override; @@ -197,6 +202,9 @@ private: NameNodeOperations nn_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; + // Keep connect callback around in case it needs to be canceled + SwappableCallbackHolder<ConnectCallback> connect_callback_; + /** * Runtime event monitoring handlers. * Note: This is really handy to have for advanced usage but http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc index 89acac3..9e2d90a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -45,6 +45,10 @@ void NameNodeOperations::Connect(const std::string &cluster_name, engine_.Connect(cluster_name, servers, handler); } +bool NameNodeOperations::CancelPendingConnect() { + return engine_.CancelPendingConnect(); +} + void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 60efacc..59b3512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -52,6 +52,8 @@ public: const std::vector<ResolvedNamenodeInfo> &servers, std::function<void(const Status &)> &&handler); + bool CancelPendingConnect(); + void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 7c280b8..651225a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -150,7 +150,8 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, protocol_version_(protocol_version), call_id_(0), retry_timer(*io_service), - event_handlers_(std::make_shared<LibhdfsEvents>()) + event_handlers_(std::make_shared<LibhdfsEvents>()), + connect_canceled_(false) { LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); @@ -182,6 +183,16 @@ void RpcEngine::Connect(const std::string &cluster_name, conn_->Connect(last_endpoints_, auth_info_, handler); } +bool RpcEngine::CancelPendingConnect() { + if(connect_canceled_) { + LOG_DEBUG(kRPC, << "RpcEngine@" << this << "::CancelPendingConnect called more than once"); + return false; + } + + connect_canceled_ = true; + return true; +} + void RpcEngine::Shutdown() { LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); io_service_->post([this]() { @@ -250,6 +261,14 @@ void RpcEngine::AsyncRpc( LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called"); + // In case user-side code isn't checking the status of Connect before doing RPC + if(connect_canceled_) { + io_service_->post( + [handler](){ handler(Status::Canceled()); } + ); + return; + } + if (!conn_) { conn_ = InitializeConnection(); conn_->ConnectAndFlush(last_endpoints_); http://git-wip-us.apache.org/repos/asf/hadoop/blob/872ca881/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 9191ab2..b4aef00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -353,6 +353,8 @@ class RpcEngine : public LockFreeRpcEngine { const std::vector<ResolvedNamenodeInfo> servers, RpcCallback &handler); + bool CancelPendingConnect(); + void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp, @@ -418,6 +420,9 @@ private: std::mutex engine_state_lock_; + // Once Connect has been canceled there is no going back + bool connect_canceled_; + // Keep endpoint info for all HA connections, a non-null ptr indicates // that HA info was found in the configuation. std::unique_ptr<HANamenodeTracker> ha_persisted_info_; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org