HDFS-11294: libhdfs++: Segfault in HA failover if DNS lookup for both Namenodes fails. Contributed by James Clampffer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/48db24a4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/48db24a4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/48db24a4 Branch: refs/heads/HDFS-7240 Commit: 48db24a430f5911a564d08f9ae927aaa9d81cd50 Parents: 12942f6 Author: James <j...@apache.org> Authored: Mon Oct 30 12:07:02 2017 -0400 Committer: James Clampffer <james.clampf...@hp.com> Committed: Thu Mar 22 17:19:47 2018 -0400 ---------------------------------------------------------------------- .../native/libhdfspp/include/hdfspp/events.h | 14 +- .../native/libhdfspp/lib/bindings/c/hdfs.cc | 90 +++++++++++- .../libhdfspp/lib/rpc/namenode_tracker.cc | 66 +++++---- .../native/libhdfspp/lib/rpc/namenode_tracker.h | 9 +- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 32 ++++- .../main/native/libhdfspp/tests/CMakeLists.txt | 12 +- .../libhdfspp/tests/hdfs_config_connect_bugs.cc | 136 +++++++++++++++++++ .../native/libhdfspp/tests/rpc_engine_test.cc | 16 ++- 8 files changed, 325 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h index 496703a..83c0deb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h @@ -37,17 +37,23 @@ static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect"; static constexpr const char * FS_NN_READ_EVENT = "NN::read"; static constexpr const char * FS_NN_WRITE_EVENT = "NN::write"; +static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect"; +static constexpr const char * FILE_DN_READ_EVENT = "DN::read"; +static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write"; + + // NN failover event due to issues with the current NN; might be standby, might be dead. // Invokes the fs_event_callback using the nameservice name in the cluster string. // The uint64_t value argument holds an address that can be reinterpreted as a const char * // and provides the full URI of the node the failover will attempt to connect to next. static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover"; -static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect"; -static constexpr const char * FILE_DN_READ_EVENT = "DN::read"; -static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write"; - +// Invoked when RpcConnection tries to use an empty set of endpoints to figure out +// which NN in a HA cluster to connect to. +static constexpr const char * FS_NN_EMPTY_ENDPOINTS_EVENT = "NN::bad_failover::no_endpoints"; +// Invoked prior to determining if failed NN rpc calls should be retried or discarded. +static constexpr const char * FS_NN_PRE_RPC_RETRY_EVENT = "NN::rpc::get_retry_action"; class event_response { public: http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 3e9ee48..9a7c8b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -41,6 +41,12 @@ using namespace std::placeholders; static constexpr tPort kDefaultPort = 8020; +/** Annotate what parts of the code below are implementatons of API functions + * and if they are normal vs. extended API. + */ +#define LIBHDFS_C_API +#define LIBHDFSPP_EXT_API + /* Separate the handles used by the C api from the C++ API*/ struct hdfs_internal { hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {} @@ -79,6 +85,7 @@ struct hdfsFile_internal { thread_local std::string errstr; /* Fetch last error that happened in this thread */ +LIBHDFSPP_EXT_API int hdfsGetLastError(char *buf, int len) { //No error message if(errstr.empty()){ @@ -255,6 +262,7 @@ optional<std::string> getAbsolutePath(hdfsFS fs, const char* path) { * C API implementations **/ +LIBHDFS_C_API int hdfsFileIsOpenForRead(hdfsFile file) { /* files can only be open for reads at the moment, do a quick check */ if (!CheckHandle(file)){ @@ -263,6 +271,7 @@ int hdfsFileIsOpenForRead(hdfsFile file) { return 1; // Update implementation when we get file writing } +LIBHDFS_C_API int hdfsFileIsOpenForWrite(hdfsFile file) { /* files can only be open for reads at the moment, so return false */ CheckHandle(file); @@ -332,6 +341,7 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st } } +LIBHDFSPP_EXT_API hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) { // Same idea as the first half of doHdfsConnect, but return the wrapped FS before // connecting. @@ -367,6 +377,7 @@ hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) { return nullptr; } +LIBHDFSPP_EXT_API int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) { if(!CheckSystem(fs)) { return ENODEV; @@ -420,24 +431,29 @@ int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) { return 0; } +LIBHDFS_C_API hdfsFS hdfsConnect(const char *nn, tPort port) { return hdfsConnectAsUser(nn, port, ""); } +LIBHDFS_C_API hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { return doHdfsConnect(std::string(nn), port, std::string(user), Options()); } +LIBHDFS_C_API hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) { //libhdfspp always returns a new instance return doHdfsConnect(std::string(nn), port, std::string(user), Options()); } +LIBHDFS_C_API hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { //libhdfspp always returns a new instance return hdfsConnectAsUser(nn, port, ""); } +LIBHDFSPP_EXT_API int hdfsCancelPendingConnection(hdfsFS fs) { // todo: stick an enum in hdfs_internal to check the connect state if(!CheckSystem(fs)) { @@ -458,6 +474,7 @@ int hdfsCancelPendingConnection(hdfsFS fs) { } } +LIBHDFS_C_API int hdfsDisconnect(hdfsFS fs) { try { @@ -476,6 +493,7 @@ int hdfsDisconnect(hdfsFS fs) { } } +LIBHDFS_C_API hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, short replication, tSize blocksize) { try @@ -512,6 +530,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, } } +LIBHDFS_C_API int hdfsCloseFile(hdfsFS fs, hdfsFile file) { try { @@ -528,6 +547,7 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) { } } +LIBHDFS_C_API char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { try { @@ -556,6 +576,7 @@ char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { } } +LIBHDFS_C_API int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { try { @@ -582,6 +603,7 @@ int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { } } +LIBHDFS_C_API int hdfsAvailable(hdfsFS fs, hdfsFile file) { //Since we do not have read ahead implemented, return 0 if fs and file are good; errno = 0; @@ -591,6 +613,7 @@ int hdfsAvailable(hdfsFS fs, hdfsFile file) { return 0; } +LIBHDFS_C_API tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { try { errno = 0; @@ -604,6 +627,7 @@ tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { } } +LIBHDFS_C_API tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { try { errno = 0; @@ -633,6 +657,7 @@ tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { } } +LIBHDFS_C_API int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { try { errno = 0; @@ -659,6 +684,7 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { } } +LIBHDFS_C_API int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { try { errno = 0; @@ -682,6 +708,7 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { } } +LIBHDFS_C_API tOffset hdfsGetCapacity(hdfsFS fs) { try { errno = 0; @@ -705,6 +732,7 @@ tOffset hdfsGetCapacity(hdfsFS fs) { } } +LIBHDFS_C_API tOffset hdfsGetUsed(hdfsFS fs) { try { errno = 0; @@ -777,6 +805,7 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info, file_info->mLastAccess = stat_info.access_time; } +LIBHDFS_C_API int hdfsExists(hdfsFS fs, const char *path) { try { errno = 0; @@ -800,6 +829,7 @@ int hdfsExists(hdfsFS fs, const char *path) { } } +LIBHDFS_C_API hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { try { errno = 0; @@ -828,6 +858,7 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { } } +LIBHDFS_C_API hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { try { errno = 0; @@ -868,6 +899,7 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { } } +LIBHDFS_C_API void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { errno = 0; @@ -880,6 +912,7 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) delete[] hdfsFileInfo; } +LIBHDFS_C_API int hdfsCreateDirectory(hdfsFS fs, const char* path) { try { errno = 0; @@ -904,6 +937,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) { } } +LIBHDFS_C_API int hdfsDelete(hdfsFS fs, const char* path, int recursive) { try { errno = 0; @@ -927,6 +961,7 @@ int hdfsDelete(hdfsFS fs, const char* path, int recursive) { } } +LIBHDFS_C_API int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { try { errno = 0; @@ -951,6 +986,7 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { } } +LIBHDFS_C_API int hdfsChmod(hdfsFS fs, const char* path, short mode){ try { errno = 0; @@ -977,6 +1013,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){ } } +LIBHDFS_C_API int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){ try { errno = 0; @@ -1003,6 +1040,7 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) } } +LIBHDFSPP_EXT_API hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){ try { errno = 0; @@ -1041,6 +1079,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t } } +LIBHDFSPP_EXT_API int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; @@ -1068,6 +1107,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { } } +LIBHDFSPP_EXT_API int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; @@ -1125,6 +1165,7 @@ int hdfsRenameSnapshot(hdfsFS fs, const char* path, const char* old_name, const } +LIBHDFSPP_EXT_API int hdfsAllowSnapshot(hdfsFS fs, const char* path) { try { errno = 0; @@ -1148,6 +1189,7 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) { } } +LIBHDFSPP_EXT_API int hdfsDisallowSnapshot(hdfsFS fs, const char* path) { try { errno = 0; @@ -1171,6 +1213,7 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) { } } +LIBHDFS_C_API tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize length) { try @@ -1193,6 +1236,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, } } +LIBHDFS_C_API tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { try { @@ -1215,12 +1259,14 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { } } +LIBHDFS_C_API int hdfsUnbufferFile(hdfsFile file) { //Currently we are not doing any buffering CheckHandle(file); return -1; } +LIBHDFS_C_API int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) { try { @@ -1239,6 +1285,7 @@ int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) } } +LIBHDFS_C_API int hdfsFileClearReadStatistics(hdfsFile file) { try { @@ -1255,16 +1302,19 @@ int hdfsFileClearReadStatistics(hdfsFile file) { } } +LIBHDFS_C_API int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) { return stats->totalBytesRead - stats->totalLocalBytesRead; } +LIBHDFS_C_API void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { errno = 0; delete stats; } /* 0 on success, -1 on error*/ +LIBHDFS_C_API int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { try { @@ -1287,6 +1337,7 @@ int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { } } +LIBHDFS_C_API tOffset hdfsTell(hdfsFS fs, hdfsFile file) { try { @@ -1326,7 +1377,7 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) { } } - +LIBHDFSPP_EXT_API int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out) { try @@ -1402,6 +1453,7 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations } } +LIBHDFSPP_EXT_API int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) { errno = 0; if (blockLocations == nullptr) @@ -1422,6 +1474,7 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) { return 0; } +LIBHDFS_C_API char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { try { @@ -1462,6 +1515,7 @@ char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) } } +LIBHDFS_C_API void hdfsFreeHosts(char ***blockHosts) { errno = 0; if (blockHosts == nullptr) @@ -1526,6 +1580,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler, return event_response::make_ok(); } +LIBHDFSPP_EXT_API int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie) { fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3); @@ -1533,7 +1588,7 @@ int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie) return 0; } - +LIBHDFSPP_EXT_API int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie) { file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4); @@ -1572,6 +1627,7 @@ hdfsBuilder::hdfsBuilder(const char * directory) : config = LoadDefault(loader); } +LIBHDFS_C_API struct hdfsBuilder *hdfsNewBuilder(void) { try @@ -1587,18 +1643,21 @@ struct hdfsBuilder *hdfsNewBuilder(void) } } +LIBHDFS_C_API void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { errno = 0; bld->overrideHost = std::string(nn); } +LIBHDFS_C_API void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { errno = 0; bld->overridePort = port; } +LIBHDFS_C_API void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { errno = 0; @@ -1607,12 +1666,14 @@ void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) } } +LIBHDFS_C_API void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { //libhdfspp always returns a new instance, so nothing to do (void)bld; errno = 0; } +LIBHDFS_C_API void hdfsFreeBuilder(struct hdfsBuilder *bld) { try @@ -1626,6 +1687,7 @@ void hdfsFreeBuilder(struct hdfsBuilder *bld) } } +LIBHDFS_C_API int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, const char *val) { @@ -1650,16 +1712,22 @@ int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, } } +LIBHDFS_C_API void hdfsConfStrFree(char *val) { errno = 0; free(val); } +LIBHDFS_C_API hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { - return doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions()); + hdfsFS fs = doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions()); + // Always free the builder + hdfsFreeBuilder(bld); + return fs; } +LIBHDFS_C_API int hdfsConfGetStr(const char *key, char **val) { try @@ -1674,6 +1742,7 @@ int hdfsConfGetStr(const char *key, char **val) } } +LIBHDFS_C_API int hdfsConfGetInt(const char *key, int32_t *val) { try @@ -1706,6 +1775,7 @@ struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory) } } +LIBHDFSPP_EXT_API int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, char **val) { @@ -1739,6 +1809,7 @@ bool isValidInt(int64_t value) value <= std::numeric_limits<int>::max()); } +LIBHDFSPP_EXT_API int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val) { try @@ -1765,6 +1836,7 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val } } +LIBHDFSPP_EXT_API int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val) { try @@ -1859,15 +1931,17 @@ void CForwardingLogger::FreeLogData(LogData *data) { free(data); } - +LIBHDFSPP_EXT_API LogData *hdfsCopyLogData(LogData *data) { return CForwardingLogger::CopyLogData(data); } +LIBHDFSPP_EXT_API void hdfsFreeLogData(LogData *data) { CForwardingLogger::FreeLogData(data); } +LIBHDFSPP_EXT_API void hdfsSetLogFunction(void (*callback)(LogData*)) { CForwardingLogger *logger = new CForwardingLogger(); logger->SetCallback(callback); @@ -1900,6 +1974,7 @@ static bool IsComponentValid(int component) { return true; } +LIBHDFSPP_EXT_API int hdfsEnableLoggingForComponent(int component) { errno = 0; if(!IsComponentValid(component)) @@ -1908,6 +1983,7 @@ int hdfsEnableLoggingForComponent(int component) { return 0; } +LIBHDFSPP_EXT_API int hdfsDisableLoggingForComponent(int component) { errno = 0; if(!IsComponentValid(component)) @@ -1916,6 +1992,7 @@ int hdfsDisableLoggingForComponent(int component) { return 0; } +LIBHDFSPP_EXT_API int hdfsSetLoggingLevel(int level) { errno = 0; if(!IsLevelValid(level)) @@ -1923,3 +2000,8 @@ int hdfsSetLoggingLevel(int level) { LogManager::SetLogLevel(static_cast<LogLevel>(level)); return 0; } + +#undef LIBHDFS_C_API +#undef LIBHDFSPP_EXT_API + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc index 9d9a816..e83a28c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc @@ -52,8 +52,10 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se active_info_ = servers[0]; standby_info_ = servers[1]; - LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str()); - LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str()); + LOG_INFO(kRPC, << "HA enabled. Using the following namenodes from the configuration." + << "\nNote: Active namenode cannot be determined until a connection has been made.") + LOG_INFO(kRPC, << "First namenode url = " << active_info_.uri.str()); + LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str()); enabled_ = true; if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { @@ -64,51 +66,57 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se HANamenodeTracker::~HANamenodeTracker() {} -// Pass in endpoint from current connection, this will do a reverse lookup -// and return the info for the standby node. It will also swap its state internally. -ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) { - LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint); +bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, + ResolvedNamenodeInfo& out) +{ mutex_guard swap_lock(swap_lock_); - ResolvedNamenodeInfo failover_node; + // Cannot look up without a key. + if(current_endpoints.size() == 0) { + event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(), + 0 /*Not much to say about context without endpoints*/); + LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint."); + return false; + } + + LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]); - // Connected to standby, switch standby to active - if(IsCurrentActive_locked(current_endpoint)) { + if(IsCurrentActive_locked(current_endpoints[0])) { std::swap(active_info_, standby_info_); if(event_handlers_) event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - failover_node = active_info_; - } else if(IsCurrentStandby_locked(current_endpoint)) { + out = active_info_; + } else if(IsCurrentStandby_locked(current_endpoints[0])) { // Connected to standby if(event_handlers_) event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - failover_node = active_info_; + out = active_info_; } else { - // Invalid state, throw for testing - std::string ep1 = format_endpoints(active_info_.endpoints); - std::string ep2 = format_endpoints(standby_info_.endpoints); - - std::stringstream msg; - msg << "Looked for " << current_endpoint << " in\n"; - msg << ep1 << " and\n"; - msg << ep2 << std::endl; - - LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out."); - throw std::runtime_error(msg.str()); + // Invalid state (or a NIC was added that didn't show up during DNS) + std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream + errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n" + << format_endpoints(active_info_.endpoints) << " and\n" + << format_endpoints(standby_info_.endpoints) << std::endl; + LOG_ERROR(kRPC, << errorMsg.str()); + return false; } - if(failover_node.endpoints.empty()) { - LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again"); - if(!ResolveInPlace(ioservice_, failover_node)) { - LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str() - << "failed. Please make sure your configuration is up to date."); + // Extra DNS on swapped node to try and get EPs if it didn't already have them + if(out.endpoints.empty()) { + LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again"); + if(!ResolveInPlace(ioservice_, out)) { + // Stuck retrying against the same NN that was able to be resolved in this case + LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str() + << " failed. Please make sure your configuration is up to date."); } } - return failover_node; + + return true; } + bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { for(unsigned int i=0;i<active_info_.endpoints.size();i++) { if(ep.address() == active_info_.endpoints[i].address()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h index f51e13c..cc34f51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h @@ -48,15 +48,18 @@ class HANamenodeTracker { bool is_enabled() const { return enabled_; } bool is_resolved() const { return resolved_; } - // Get node opposite of the current one if possible (swaps active/standby) + // Pass in vector of endpoints held by RpcConnection, use endpoints to infer node + // currently being used. Swap internal state and set out to other node. // Note: This will always mutate internal state. Use IsCurrentActive/Standby to // get info without changing state - ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint); + bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints, + ResolvedNamenodeInfo& out); + private: + // See if endpoint ep is part of the list of endpoints for the active or standby NN bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; - private: // If HA should be enabled, according to our options and runtime info like # nodes provided bool enabled_; // If we were able to resolve at least 1 HA namenode http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index f59f2ce..34af423 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -213,6 +213,11 @@ void RpcEngine::RpcCommsError( optional<RetryAction> head_action = optional<RetryAction>(); // Filter out anything with too many retries already + if(event_handlers_) { + event_handlers_->call(FS_NN_PRE_RPC_RETRY_EVENT, "RpcCommsError", + reinterpret_cast<int64_t>(this)); + } + for (auto it = pendingRequests.begin(); it < pendingRequests.end();) { auto req = *it; @@ -261,15 +266,34 @@ void RpcEngine::RpcCommsError( // If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active) if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) { - for(unsigned int i=0; i<pendingRequests.size();i++) + for(unsigned int i=0; i<pendingRequests.size();i++) { pendingRequests[i]->IncrementFailoverCount(); + } - ResolvedNamenodeInfo new_active_nn_info = - ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_[0]/*reverse lookup*/); + ResolvedNamenodeInfo new_active_nn_info; + bool failoverInfoFound = ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_, new_active_nn_info); + if(!failoverInfoFound) { + // This shouldn't be a common case, the set of endpoints was empty, likely due to DNS issues. + // Another possibility is a network device has been added or removed due to a VM starting or stopping. + + LOG_ERROR(kRPC, << "Failed to find endpoints for the alternate namenode." + << "Make sure Namenode hostnames can be found with a DNS lookup."); + // Kill all pending RPC requests since there's nowhere for this to go + Status badEndpointStatus = Status::Error("No endpoints found for namenode"); + + for(unsigned int i=0; i<pendingRequests.size(); i++) { + std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i]; + io_service().post([sharedCurrentRequest, badEndpointStatus]() { + sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock + }); + } - LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str()); + // Clear request vector. This isn't a recoverable error. + pendingRequests.clear(); + } if(ha_persisted_info_->is_resolved()) { + LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str()); last_endpoints_ = new_active_nn_info.endpoints; } else { LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info=" http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 0b4581e..3331935 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -120,6 +120,12 @@ add_executable(user_lock_test user_lock_test.cc) target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(user_lock user_lock_test) +add_executable(hdfs_config_connect_bugs_test hdfs_config_connect_bugs.cc) +target_link_libraries(hdfs_config_connect_bugs_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +add_memcheck_test(hdfs_config_connect_bugs hdfs_config_connect_bugs_test) + + + # # # INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS @@ -136,7 +142,7 @@ include_directories ( add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc) -# TODO: get all of the mini dfs library bits here in one plase +# TODO: get all of the mini dfs library bits here in one place # add_library(hdfspp_mini_cluster native_mini_dfs ${JAVA_JVM_LIBRARY} ) #TODO: Link against full library rather than just parts @@ -157,4 +163,8 @@ build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hd link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) +#build_libhdfs_test(hdfs_config_connect_bugs hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_config_connect_bugs.cc) +#link_libhdfs_test (hdfs_config_connect_bugs hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) +#add_libhdfs_test (hdfs_config_connect_bugs hdfspp_test_shim_static) + endif(HADOOP_BUILD) http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc new file mode 100644 index 0000000..fc31227 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/hdfs_ext.h" + +#include "configuration_test.h" + +#include <google/protobuf/stubs/common.h> + +#include <cstring> +#include <chrono> +#include <exception> + + +static const char *hdfs_11294_core_site_txt = +"<configuration>\n" +" <property name=\"fs.defaultFS\" value=\"hdfs://NAMESERVICE1\"/>\n" +" <property name=\"hadoop.security.authentication\" value=\"simple\"/>\n" +" <property name=\"ipc.client.connect.retry.interval\" value=\"1\">\n" +"</configuration>\n"; + +static const char *hdfs_11294_hdfs_site_txt = +"<configuration>\n" +" <property>\n" +" <name>dfs.nameservices</name>\n" +" <value>NAMESERVICE1</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.ha.namenodes.NAMESERVICE1</name>\n" +" <value>nn1, nn2</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.rpc-address.NAMESERVICE1.nn1</name>\n" +" <value>nonesuch1.apache.org:8020</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn1</name>\n" +" <value>nonesuch1.apache.org:8040</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.http-address.NAMESERVICE1.nn1</name>\n" +" <value>nonesuch1.apache.org:50070</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.rpc-address.NAMESERVICE1.nn2</name>\n" +" <value>nonesuch2.apache.org:8020</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn2</name>\n" +" <value>nonesuch2.apache.org:8040</value>\n" +" </property>\n" +" <property>\n" +" <name>dfs.namenode.http-address.NAMESERVICE1.nn2</name>\n" +" <value>nonesuch2.apache.org:50070</value>\n" +" </property>\n" +"</configuration>\n"; + + + + +namespace hdfs { + +// Make sure we can set up a mini-cluster and connect to it +TEST(ConfigConnectBugs, Test_HDFS_11294) { + // Directory for hdfs config + TempDir td; + + const std::string& tempDirPath = td.path; + const std::string coreSitePath = tempDirPath + "/core-site.xml"; + const std::string hdfsSitePath = tempDirPath + "/hdfs-site.xml"; + + // Write configs + FILE *coreSite = fopen(coreSitePath.c_str(), "w"); + EXPECT_NE(coreSite, nullptr); + int coreSiteLength = strlen(hdfs_11294_core_site_txt); + size_t res = fwrite(hdfs_11294_core_site_txt, 1, coreSiteLength, coreSite); + EXPECT_EQ(res, coreSiteLength); + EXPECT_EQ(fclose(coreSite), 0); + + FILE *hdfsSite = fopen(hdfsSitePath.c_str(), "w"); + EXPECT_NE(hdfsSite, nullptr); + int hdfsSiteLength = strlen(hdfs_11294_hdfs_site_txt); + res = fwrite(hdfs_11294_hdfs_site_txt, 1, hdfsSiteLength, hdfsSite); + EXPECT_EQ(res, hdfsSiteLength); + EXPECT_EQ(fclose(hdfsSite), 0); + + // Load configs with new FS + hdfsBuilder *bld = hdfsNewBuilderFromDirectory(tempDirPath.c_str()); + hdfsBuilderSetNameNode(bld, "NAMESERVICE1"); + + // In HDFS-11294 connecting would crash because DNS couldn't resolve + // endpoints but the RpcEngine would attempt to dereference a non existant + // element in a std::vector and crash. Test passes if connect doesn't crash. + hdfsFS fileSystem = hdfsBuilderConnect(bld); + + // FS shouldn't be created if it can't connect. + EXPECT_EQ(fileSystem, nullptr); + + // Verify it got to endpoint check + char errMsgBuf[100]; + memset(errMsgBuf, 0, 100); + EXPECT_EQ( hdfsGetLastError(errMsgBuf, 100), 0); + EXPECT_STREQ(errMsgBuf, "Exception:No endpoints found for namenode"); + + + // remove config files + EXPECT_EQ(remove(coreSitePath.c_str()), 0); + EXPECT_EQ(remove(hdfsSitePath.c_str()), 0); +} + +} // end namespace hdfs + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + + return exit_code; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/48db24a4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 2b6cdbc..776894e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -398,14 +398,20 @@ TEST(RpcEngineTest, TestEventCallbacks) io_service.stop(); ASSERT_TRUE(stat.ok()); }); + + // If you're adding event hooks you'll most likely need to update this. + // It's a brittle test but makes it hard to miss control flow changes in RPC retry. + for(const auto& m : callbacks) + std::cerr << m << std::endl; io_service.run(); ASSERT_TRUE(complete); - ASSERT_EQ(8, callbacks.size()); + ASSERT_EQ(9, callbacks.size()); ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error - ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect - ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error - ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect - for (int i=4; i < 7; i++) + ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[1]); // figure out retry decision + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[2]); // reconnect + ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[3]); // makes an error + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[4]); // reconnect + for (int i=5; i < 8; i++) ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org