http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h new file mode 100644 index 0000000..1d596ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef LIBHDFSPP_BADDATANODETRACKER_H +#define LIBHDFSPP_BADDATANODETRACKER_H + +#include <mutex> +#include <chrono> +#include <map> +#include <string> +#include <set> + +#include "hdfspp/options.h" +#include "hdfspp/hdfspp.h" + +namespace hdfs { + +/** + * ExclusionSet is a simple override that can be filled with known + * bad node UUIDs and passed to AsyncPreadSome. + **/ +class ExclusionSet : public NodeExclusionRule { + public: + ExclusionSet(const std::set<std::string>& excluded); + virtual ~ExclusionSet(); + virtual bool IsBadNode(const std::string& node_uuid); + + private: + std::set<std::string> excluded_; +}; + +/** + * BadDataNodeTracker keeps a timestamped list of datanodes that have + * failed during past operations. Entries present in this list will + * not be used for new requests. Entries will be evicted from the list + * after a period of time has elapsed; the default is 10 minutes. + */ +class BadDataNodeTracker : public NodeExclusionRule { + public: + BadDataNodeTracker(const Options& options = Options()); + virtual ~BadDataNodeTracker(); + /* add a bad DN to the list */ + void AddBadNode(const std::string& dn); + /* check if a node should be excluded */ + virtual bool IsBadNode(const std::string& dn); + /* only for tests, shift clock by t milliseconds*/ + void TEST_set_clock_shift(int t); + + private: + typedef std::chrono::steady_clock Clock; + typedef std::chrono::time_point<Clock> TimePoint; + bool TimeoutExpired(const TimePoint& t); + /* after timeout_duration_ elapses remove DN */ + const unsigned int timeout_duration_; /* milliseconds */ + std::map<std::string, TimePoint> datanodes_; + std::mutex datanodes_update_lock_; + int test_clock_shift_; +}; +} +#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc new file mode 100644 index 0000000..ba702b0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filehandle.h" +#include "common/continuation/continuation.h" +#include "common/logging.h" +#include "connection/datanodeconnection.h" +#include "reader/block_reader.h" +#include "hdfspp/events.h" + +#include <future> +#include <tuple> + +#define FMT_THIS_ADDR "this=" << (void*)this + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +FileHandle::~FileHandle() {} + +FileHandleImpl::FileHandleImpl(const std::string & cluster_name, + const std::string & path, + ::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr<const struct FileInfo> file_info, + std::shared_ptr<BadDataNodeTracker> bad_data_nodes, + std::shared_ptr<LibhdfsEvents> event_handlers) + : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), + bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" + << FMT_THIS_ADDR << ", ...) called"); + +} + +void FileHandleImpl::PositionRead( + void *buf, size_t buf_size, uint64_t offset, + const std::function<void(const Status &, size_t)> &handler) { + LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", buf_size=" << std::to_string(buf_size) << ") called"); + + /* prevent usage after cancelation */ + if(cancel_state_->is_canceled()) { + handler(Status::Canceled(), 0); + return; + } + + auto callback = [this, handler](const Status &status, + const std::string &contacted_datanode, + size_t bytes_read) { + /* determine if DN gets marked bad */ + if (ShouldExclude(status)) { + bad_node_tracker_->AddBadNode(contacted_datanode); + } + + bytes_read_ += bytes_read; + handler(status, bytes_read); + }; + + AsyncPreadSome(offset, asio::buffer(buf, buf_size), bad_node_tracker_, callback); +} + +Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) { + LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", buf_size=" << std::to_string(buf_size) + << ", offset=" << offset << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>(); + std::future<std::tuple<Status, size_t>> future(callstate->get_future()); + + /* wrap async call with promise/future to make it blocking */ + auto callback = [callstate](const Status &s, size_t bytes) { + callstate->set_value(std::make_tuple(s,bytes)); + }; + + PositionRead(buf, buf_size, offset, callback); + + /* wait for async to finish */ + auto returnstate = future.get(); + auto stat = std::get<0>(returnstate); + + if (!stat.ok()) { + return stat; + } + + *bytes_read = std::get<1>(returnstate); + return stat; +} + +Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) { + LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read(" + << FMT_THIS_ADDR << ", buf=" << buf + << ", buf_size=" << std::to_string(buf_size) << ") called"); + + Status stat = PositionRead(buf, buf_size, offset_, bytes_read); + if(!stat.ok()) { + return stat; + } + + offset_ += *bytes_read; + return Status::OK(); +} + +Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) { + LOG_DEBUG(kFileHandle, << "FileHandleImpl::Seek(" + << ", offset=" << *offset << ", ...) called"); + + if(cancel_state_->is_canceled()) { + return Status::Canceled(); + } + + off_t new_offset = -1; + + switch (whence) { + case std::ios_base::beg: + new_offset = *offset; + break; + case std::ios_base::cur: + new_offset = offset_ + *offset; + break; + case std::ios_base::end: + new_offset = file_info_->file_length_ + *offset; + break; + default: + /* unsupported */ + return Status::InvalidArgument("Invalid Seek whence argument"); + } + + if(!CheckSeekBounds(new_offset)) { + return Status::InvalidArgument("Seek offset out of bounds"); + } + offset_ = new_offset; + + *offset = offset_; + return Status::OK(); +} + +/* return false if seek will be out of bounds */ +bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) { + ssize_t file_length = file_info_->file_length_; + + if (desired_position < 0 || desired_position > file_length) { + return false; + } + + return true; +} + +/* + * Note that this method must be thread-safe w.r.t. the unsafe operations occurring + * on the FileHandle + */ +void FileHandleImpl::AsyncPreadSome( + size_t offset, const MutableBuffers &buffers, + std::shared_ptr<NodeExclusionRule> excluded_nodes, + const std::function<void(const Status &, const std::string &, size_t)> handler) { + using ::hadoop::hdfs::DatanodeInfoProto; + using ::hadoop::hdfs::LocatedBlockProto; + + LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" + << FMT_THIS_ADDR << ", ...) called"); + + if(cancel_state_->is_canceled()) { + handler(Status::Canceled(), "", 0); + return; + } + + if(offset == file_info_->file_length_) { + handler(Status::OK(), "", 0); + return; + } else if(offset > file_info_->file_length_){ + handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0); + return; + } + + /** + * Note: block and chosen_dn will end up pointing to things inside + * the blocks_ vector. They shouldn't be directly deleted. + **/ + auto block = std::find_if( + file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (block == file_info_->blocks_.end()) { + LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR + << ", ...) Cannot find corresponding blocks"); + handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); + return; + } + + /** + * If user supplies a rule use it, otherwise use the tracker. + * User is responsible for making sure one of them isn't null. + **/ + std::shared_ptr<NodeExclusionRule> rule = + excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_; + + auto datanodes = block->locs(); + auto it = std::find_if(datanodes.begin(), datanodes.end(), + [rule](const DatanodeInfoProto &dn) { + return !rule->IsBadNode(dn.id().datanodeuuid()); + }); + + if (it == datanodes.end()) { + LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" + << FMT_THIS_ADDR << ", ...) No datanodes available"); + + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); + return; + } + + DatanodeInfoProto &chosen_dn = *it; + + std::string dnIpAddr = chosen_dn.id().ipaddr(); + std::string dnHostName = chosen_dn.id().hostname(); + + uint64_t offset_within_block = offset - block->offset(); + uint64_t size_within_block = std::min<uint64_t>( + block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" + << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr + << ", file path=\"" << path_ << "\", offset=" << std::to_string(offset) << ", read size=" << size_within_block); + + // This is where we will put the logic for re-using a DN connection; we can + // steal the FileHandle's dn and put it back when we're done + std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken()); + std::string dn_id = dn->uuid_; + std::string client_name = client_name_; + + // Wrap the DN in a block reader to handle the state and logic of the + // block request protocol + std::shared_ptr<BlockReader> reader; + reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_); + + // Lambdas cannot capture copies of member variables so we'll make explicit + // copies for it + auto event_handlers = event_handlers_; + auto path = path_; + auto cluster_name = cluster_name_; + + auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { + event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response_type() == event_response::kTest_Error) { + handler(event_resp.status(), dn_id, transferred); + return; + } +#endif + + handler(status, dn_id, transferred); + }; + + auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] + (Status status, std::shared_ptr<DataNodeConnection> dn) { + (void)dn; + event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response_type() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + + if (status.ok()) { + reader->AsyncReadBlock( + client_name, *block, offset_within_block, + asio::buffer(buffers, size_within_block), read_handler); + } else { + handler(status, dn_id, 0); + } + }; + + dn->Connect(connect_handler); + + return; +} + +std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, + std::shared_ptr<DataNodeConnection> dn, + std::shared_ptr<LibhdfsEvents> event_handlers) +{ + std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers); + + LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR + << ", ..., dnconn=" << dn.get() + << ") called. New BlockReader = " << reader.get()); + + readers_.AddReader(reader); + return reader; +} + +std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( + ::asio::io_service * io_service, + const ::hadoop::hdfs::DatanodeInfoProto & dn, + const hadoop::common::TokenProto * token) { + LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" + << FMT_THIS_ADDR << ", ...) called"); + return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, event_handlers_.get()); +} + +std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() { + return event_handlers_; +} + +void FileHandleImpl::CancelOperations() { + LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations(" + << FMT_THIS_ADDR << ") called"); + + cancel_state_->set_canceled(); + + /* Push update to BlockReaders that may be hung in an asio call */ + std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders(); + for(auto reader : live_readers) { + reader->CancelOperation(); + } +} + +void FileHandleImpl::SetFileEventCallback(file_event_callback callback) { + std::shared_ptr<LibhdfsEvents> new_event_handlers; + if (event_handlers_) { + new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_); + } else { + new_event_handlers = std::make_shared<LibhdfsEvents>(); + } + new_event_handlers->set_file_callback(callback); + event_handlers_ = new_event_handlers; +} + + + +bool FileHandle::ShouldExclude(const Status &s) { + if (s.ok()) { + return false; + } + + switch (s.code()) { + /* client side resource exhaustion */ + case Status::kResourceUnavailable: + case Status::kOperationCanceled: + return false; + case Status::kInvalidArgument: + case Status::kUnimplemented: + case Status::kException: + default: + return true; + } +} + +uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_.load(); } + +void FileHandleImpl::clear_bytes_read() { bytes_read_.store(0); } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h new file mode 100644 index 0000000..4135156 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ +#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ + +#include "common/hdfs_ioservice.h" +#include "common/async_stream.h" +#include "common/cancel_tracker.h" +#include "common/libhdfs_events_impl.h" +#include "common/new_delete.h" +#include "reader/fileinfo.h" +#include "reader/readergroup.h" + +#include "asio.hpp" +#include "bad_datanode_tracker.h" +#include "ClientNamenodeProtocol.pb.h" + +#include <mutex> +#include <iostream> + +namespace hdfs { + +class BlockReader; +struct BlockReaderOptions; +class DataNodeConnection; + +/* + * FileHandle: coordinates operations on a particular file in HDFS + * + * Threading model: not thread-safe; consumers and io_service should not call + * concurrently. PositionRead is the exceptions; they can be + * called concurrently and repeatedly. + * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is + * resonsible for freeing the object. + */ +class FileHandleImpl : public FileHandle { +public: + MEMCHECKED_CLASS(FileHandleImpl) + FileHandleImpl(const std::string & cluster_name, + const std::string & path, + ::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr<const struct FileInfo> file_info, + std::shared_ptr<BadDataNodeTracker> bad_data_nodes, + std::shared_ptr<LibhdfsEvents> event_handlers); + + /* + * Reads the file at the specified offset into the buffer. + * bytes_read returns the number of bytes successfully read on success + * and on error. Status::InvalidOffset is returned when trying to begin + * a read past the EOF. + */ + void PositionRead( + void *buf, + size_t buf_size, + uint64_t offset, + const std::function<void(const Status &status, size_t bytes_read)> &handler + ) override; + + /** + * Reads the file at the specified offset into the buffer. + * @param buf output buffer + * @param buf_size size of the output buffer + * @param offset offset at which to start reading + * @param bytes_read number of bytes successfully read + */ + Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) override; + Status Read(void *buf, size_t buf_size, size_t *bytes_read) override; + Status Seek(off_t *offset, std::ios_base::seekdir whence) override; + + + /* + * Reads some amount of data into the buffer. Will attempt to find the best + * datanode and read data from it. + * + * If an error occurs during connection or transfer, the callback will be + * called with bytes_read equal to the number of bytes successfully transferred. + * If no data nodes can be found, status will be Status::ResourceUnavailable. + * If trying to begin a read past the EOF, status will be Status::InvalidOffset. + * + */ + void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, + std::shared_ptr<NodeExclusionRule> excluded_nodes, + const std::function<void(const Status &status, + const std::string &dn_id, size_t bytes_read)> handler); + + /** + * Cancels all operations instantiated from this FileHandle. + * Will set a flag to abort continuation pipelines when they try to move to the next step. + * Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO. + **/ + virtual void CancelOperations(void) override; + + virtual void SetFileEventCallback(file_event_callback callback) override; + + /** + * Ephemeral objects created by the filehandle will need to get the event + * handler registry owned by the FileSystem. + **/ + std::shared_ptr<LibhdfsEvents> get_event_handlers(); + + /* how many bytes have been successfully read */ + virtual uint64_t get_bytes_read() override; + + /* resets the number of bytes read to zero */ + virtual void clear_bytes_read() override; + +protected: + virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, + std::shared_ptr<DataNodeConnection> dn, + std::shared_ptr<hdfs::LibhdfsEvents> event_handlers); + virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( + ::asio::io_service *io_service, + const ::hadoop::hdfs::DatanodeInfoProto & dn, + const hadoop::common::TokenProto * token); +private: + const std::string cluster_name_; + const std::string path_; + ::asio::io_service * const io_service_; + const std::string client_name_; + const std::shared_ptr<const struct FileInfo> file_info_; + std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; + bool CheckSeekBounds(ssize_t desired_position); + off_t offset_; + CancelHandle cancel_state_; + ReaderGroup readers_; + std::shared_ptr<LibhdfsEvents> event_handlers_; + std::atomic<uint64_t> bytes_read_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc new file mode 100644 index 0000000..56d02d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -0,0 +1,859 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +#include "common/namenode_info.h" + +#include <functional> +#include <limits> +#include <future> +#include <tuple> +#include <iostream> +#include <pwd.h> +#include <fnmatch.h> + +#define FMT_THIS_ADDR "this=" << (void*)this + +namespace hdfs { + +static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +static const int kNamenodeProtocolVersion = 1; + +using ::asio::ip::tcp; + +static constexpr uint16_t kDefaultPort = 8020; + +// forward declarations +const std::string get_effective_user_name(const std::string &); + +uint32_t FileSystem::GetDefaultFindMaxDepth() { + return std::numeric_limits<uint32_t>::max(); +} + +uint16_t FileSystem::GetDefaultPermissionMask() { + return 0755; +} + +Status FileSystem::CheckValidPermissionMask(uint16_t permissions) { + if (permissions > 01777) { + std::stringstream errormsg; + errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct + << std::showbase << permissions << " (should be between 0 and 01777)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + +Status FileSystem::CheckValidReplication(uint16_t replication) { + if (replication < 1 || replication > 512) { + std::stringstream errormsg; + errormsg << "CheckValidReplication: argument 'replication' is " + << replication << " (should be between 1 and 512)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + +FileSystem::~FileSystem() {} + +/***************************************************************************** + * FILESYSTEM BASE CLASS + ****************************************************************************/ + +FileSystem *FileSystem::New( + IoService *&io_service, const std::string &user_name, const Options &options) { + return new FileSystemImpl(io_service, user_name, options); +} + +FileSystem *FileSystem::New( + std::shared_ptr<IoService> io_service, const std::string &user_name, const Options &options) { + return new FileSystemImpl(io_service, user_name, options); +} + +FileSystem *FileSystem::New() { + // No, this pointer won't be leaked. The FileSystem takes ownership. + std::shared_ptr<IoService> io_service = IoService::MakeShared(); + if(!io_service) + return nullptr; + int thread_count = io_service->InitDefaultWorkers(); + if(thread_count < 1) + return nullptr; + + std::string user_name = get_effective_user_name(""); + Options options; + return new FileSystemImpl(io_service, user_name, options); +} + +/***************************************************************************** + * FILESYSTEM IMPLEMENTATION + ****************************************************************************/ + +const std::string get_effective_user_name(const std::string &user_name) { + if (!user_name.empty()) + return user_name; + + // If no user name was provided, try the HADOOP_USER_NAME and USER environment + // variables + const char * env = getenv("HADOOP_USER_NAME"); + if (env) { + return env; + } + + env = getenv("USER"); + if (env) { + return env; + } + + // If running on POSIX, use the currently logged in user +#if defined(_POSIX_VERSION) + uid_t uid = geteuid(); + struct passwd *pw = getpwuid(uid); + if (pw && pw->pw_name) + { + return pw->pw_name; + } +#endif + + return "unknown_user"; +} + +FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) : + io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options), + client_name_(GetRandomClientName()), + nn_( + &io_service_->io_service(), options, client_name_, + get_effective_user_name(user_name), kNamenodeProtocol, + kNamenodeProtocolVersion + ), + bad_node_tracker_(std::make_shared<BadDataNodeTracker>()), + event_handlers_(std::make_shared<LibhdfsEvents>()) +{ + + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" + << FMT_THIS_ADDR << ") called"); + + // Poor man's move + io_service = nullptr; + + unsigned int running_workers = 0; + if(options.io_threads_ < 1) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads"); + running_workers = io_service_->InitDefaultWorkers(); + } else { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads."); + running_workers = io_service->InitWorkers(options_.io_threads_); + } + + if(running_workers < 1) { + LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads"); + } +} + +FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) : + io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), options_(options), + client_name_(GetRandomClientName()), + nn_( + &io_service_->io_service(), options, client_name_, + get_effective_user_name(user_name), kNamenodeProtocol, + kNamenodeProtocolVersion + ), + bad_node_tracker_(std::make_shared<BadDataNodeTracker>()), + event_handlers_(std::make_shared<LibhdfsEvents>()) +{ + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" + << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called"); + int worker_thread_count = io_service_->get_worker_thread_count(); + if(worker_thread_count < 1) { + LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. " + << "It needs at least 1 worker to connect to an HDFS cluster.") + } else { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads."); + } +} + +FileSystemImpl::~FileSystemImpl() { + LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl(" + << FMT_THIS_ADDR << ") called"); + + /** + * Note: IoService must be stopped before getting rid of worker threads. + * Once worker threads are joined and deleted the service can be deleted. + **/ + io_service_->Stop(); +} + +void FileSystemImpl::Connect(const std::string &server, + const std::string &service, + const std::function<void(const Status &, FileSystem * fs)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR + << ", server=" << server << ", service=" + << service << ") called"); + connect_callback_.SetCallback(handler); + + /* IoService::New can return nullptr */ + if (!io_service_) { + handler (Status::Error("Null IoService"), this); + } + + // DNS lookup here for namenode(s) + std::vector<ResolvedNamenodeInfo> resolved_namenodes; + + auto name_service = options_.services.find(server); + if(name_service != options_.services.end()) { + cluster_name_ = name_service->first; + resolved_namenodes = BulkResolve(&io_service_->io_service(), name_service->second); + } else { + cluster_name_ = server + ":" + service; + + // tmp namenode info just to get this in the right format for BulkResolve + NamenodeInfo tmp_info; + try { + tmp_info.uri = URI::parse_from_string("hdfs://" + cluster_name_); + } catch (const uri_parse_error& e) { + LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << cluster_name_); + handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this); + } + + resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info}); + } + + for(unsigned int i=0;i<resolved_namenodes.size();i++) { + LOG_DEBUG(kFileSystem, << "Resolved Namenode"); + LOG_DEBUG(kFileSystem, << resolved_namenodes[i].str()); + } + + + nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this](const Status & s) { + connect_callback_.GetCallback()(s, this); + }); +} + + +void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) { + std::string scheme = options_.defaultFS.get_scheme(); + if (strcasecmp(scheme.c_str(), "hdfs") != 0) { + std::string error_message; + error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported"; + handler(Status::InvalidArgument(error_message.c_str()), nullptr); + return; + } + + std::string host = options_.defaultFS.get_host(); + if (host.empty()) { + handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr); + return; + } + + int16_t port = options_.defaultFS.get_port_or_default(kDefaultPort); + std::string port_as_string = std::to_string(port); + + Connect(host, port_as_string, handler); +} + +int FileSystemImpl::AddWorkerThread() { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread(" + << FMT_THIS_ADDR << ") called." + << " Existing thread count = " << WorkerThreadCount()); + + if(!io_service_) + return -1; + + io_service_->AddWorkerThread(); + return 1; +} + +int FileSystemImpl::WorkerThreadCount() { + if(!io_service_) { + return -1; + } else { + return io_service_->get_worker_thread_count(); + } +} + +bool FileSystemImpl::CancelPendingConnect() { + if(connect_callback_.IsCallbackAccessed()) { + // Temp fix for failover hangs, allow CancelPendingConnect to be called so it can push a flag through the RPC engine + LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed"); + return nn_.CancelPendingConnect(); + } + + if(!connect_callback_.IsCallbackSet()) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started"); + return false; + } + + // First invoke callback, then do proper teardown in RpcEngine and RpcConnection + ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) { + LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString()); + }; + + bool callback_swapped = false; + ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped); + + if(callback_swapped) { + // Take original callback and invoke it as if it was canceled. + LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status."); + std::function<void(void)> wrapped_callback = [original_callback, this](){ + // handling code expected to check status before dereferenceing 'this' + original_callback(Status::Canceled(), this); + }; + io_service_->PostTask(wrapped_callback); + } else { + LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.") + return false; + } + + // Now push cancel down to clean up where possible and make sure the RpcEngine + // won't try to do retries in the background. The rest of the memory cleanup + // happens when this FileSystem is deleted by the user. + return nn_.CancelPendingConnect(); +} + +void FileSystemImpl::Open( + const std::string &path, + const std::function<void(const Status &, FileHandle *)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetBlockLocations(path, 0, std::numeric_limits<int64_t>::max(), [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) { + if(!stat.ok()) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString()); + if(stat.get_server_exception_type() == Status::kStandbyException) { + LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode"); + } + } + handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) + : nullptr); + }); +} + + +BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock) +{ + BlockLocation result; + + result.setCorrupt(locatedBlock.corrupt()); + result.setOffset(locatedBlock.offset()); + + std::vector<DNInfo> dn_info; + dn_info.reserve(locatedBlock.locs_size()); + for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) { + const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id(); + DNInfo newInfo; + if (id.has_ipaddr()) + newInfo.setIPAddr(id.ipaddr()); + if (id.has_hostname()) + newInfo.setHostname(id.hostname()); + if (id.has_xferport()) + newInfo.setXferPort(id.xferport()); + if (id.has_infoport()) + newInfo.setInfoPort(id.infoport()); + if (id.has_ipcport()) + newInfo.setIPCPort(id.ipcport()); + if (id.has_infosecureport()) + newInfo.setInfoSecurePort(id.infosecureport()); + if (datanode_info.has_location()) + newInfo.setNetworkLocation(datanode_info.location()); + dn_info.push_back(newInfo); + } + result.setDataNodes(dn_info); + + if (locatedBlock.has_b()) { + const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b(); + result.setLength(b.numbytes()); + } + + + return result; +} + +void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler) +{ + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + //Protobuf gives an error 'Negative value is not supported' + //if the high bit is set in uint64 in GetBlockLocations + if (IsHighBitSet(offset)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); + return; + } + if (IsHighBitSet(length)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); + return; + } + + auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) { + if (status.ok()) { + auto result = std::make_shared<FileBlockLocation>(); + + result->setFileLength(fileInfo->file_length_); + result->setLastBlockComplete(fileInfo->last_block_complete_); + result->setUnderConstruction(fileInfo->under_construction_); + + std::vector<BlockLocation> blocks; + for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) { + auto newLocation = LocatedBlockToBlockLocation(locatedBlock); + blocks.push_back(newLocation); + } + result->setBlockLocations(blocks); + + handler(status, result); + } else { + handler(status, std::shared_ptr<FileBlockLocation>()); + } + }; + + nn_.GetBlockLocations(path, offset, length, conversion); +} + +void FileSystemImpl::GetPreferredBlockSize(const std::string &path, + const std::function<void(const Status &, const uint64_t &)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetPreferredBlockSize(path, handler); +} + + +void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); + return; + } + Status replStatus = FileSystem::CheckValidReplication(replication); + if (!replStatus.ok()) { + handler(replStatus); + return; + } + + nn_.SetReplication(path, replication, handler); +} + + +void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function<void(const Status &)> handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); + return; + } + + nn_.SetTimes(path, mtime, atime, handler); +} + + +void FileSystemImpl::GetFileInfo( + const std::string &path, + const std::function<void(const Status &, const StatInfo &)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetFileInfo(path, handler); +} + +void FileSystemImpl::GetContentSummary( + const std::string &path, + const std::function<void(const Status &, const ContentSummary &)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetContentSummary(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetContentSummary(path, handler); +} + +void FileSystemImpl::GetFsStats( + const std::function<void(const Status &, const FsInfo &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called"); + + nn_.GetFsStats(handler); +} + + +/** + * Helper function for recursive GetListing calls. + * + * Some compilers don't like recursive lambdas, so we make the lambda call a + * method, which in turn creates a lambda calling itself. + */ +void FileSystemImpl::GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, + std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { + bool has_next = !stat_infos.empty(); + bool get_more = handler(stat, stat_infos, has_more && has_next); + if (get_more && has_more && has_next ) { + auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + GetListingShim(stat, stat_infos, has_more, path, handler); + }; + + std::string last = stat_infos.back().path; + nn_.GetListing(path, callback, last); + } +} + +void FileSystemImpl::GetListing( + const std::string &path, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetListing(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + std::string path_fixed = path; + if(path.back() != '/'){ + path_fixed += "/"; + } + // Caputure the state and push it into the shim + auto callback = [this, path_fixed, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + GetListingShim(stat, stat_infos, has_more, path_fixed, handler); + }; + + nn_.GetListing(path_fixed, callback); +} + + +void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, + std::function<void(const Status &)> handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << + ", permissions=" << permissions << ", createparent=" << createparent << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty")); + return; + } + + Status permStatus = FileSystem::CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } + + nn_.Mkdirs(path, permissions, createparent, handler); +} + + +void FileSystemImpl::Delete(const std::string &path, bool recursive, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty")); + return; + } + + nn_.Delete(path, recursive, handler); +} + + +void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); + + if (oldPath.empty()) { + handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty")); + return; + } + + if (newPath.empty()) { + handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty")); + return; + } + + nn_.Rename(oldPath, newPath, handler); +} + + +void FileSystemImpl::SetPermission(const std::string & path, + uint16_t permissions, const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); + return; + } + Status permStatus = FileSystem::CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } + + nn_.SetPermission(path, permissions, handler); +} + + +void FileSystemImpl::SetOwner(const std::string & path, const std::string & username, + const std::string & groupname, const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty")); + return; + } + + nn_.SetOwner(path, username, groupname, handler); +} + + +/** + * Helper function for recursive Find calls. + * + * Some compilers don't like recursive lambdas, so we make the lambda call a + * method, which in turn creates a lambda calling itself. + * + * ***High-level explanation*** + * + * Since we are allowing to use wild cards in both path and name, we start by expanding the path first. + * Boolean search_path is set to true when we search for the path and false when we search for the name. + * When we search for the path we break the given path pattern into sub-directories. Starting from the + * first sub-directory we list them one-by-one and recursively continue into directories that matched the + * path pattern at the current depth. Directories that are large will be requested to continue sending + * the results. We keep track of the current depth within the path pattern in the 'depth' variable. + * This continues recursively until the depth reaches the end of the path. Next that we start matching + * the name pattern. All directories that we find we recurse now, and all names that match the given name + * pattern are being stored in outputs and later sent back to the user. + */ +void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more, + std::shared_ptr<FindOperationalState> operational_state, std::shared_ptr<FindSharedState> shared_state) { + //We buffer the outputs then send them back at the end + std::vector<StatInfo> outputs; + //Return on error + if(!stat.ok()){ + std::lock_guard<std::mutex> find_lock(shared_state->lock); + //We send true becuase we do not want the user code to exit before all our requests finished + shared_state->handler(stat, outputs, true); + shared_state->aborted = true; + } + if(!shared_state->aborted){ + //User did not abort the operation + if (directory_has_more) { + //Directory is large and has more results + //We launch another async call to get more results + shared_state->outstanding_requests++; + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + FindShim(stat, stat_infos, has_more, operational_state, shared_state); + }; + std::string last = stat_infos.back().path; + nn_.GetListing(operational_state->path, callback, last); + } + if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){ + //We are searching for the path and did not reach the end of the path yet + for (StatInfo const& si : stat_infos) { + //If we are at the last depth and it matches both path and name, we need to output it. + if (operational_state->depth == shared_state->dirs.size() - 2 + && !fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0) + && !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) { + outputs.push_back(si); + } + //Skip if not directory + if(si.file_type != StatInfo::IS_DIR) { + continue; + } + //Checking for a match with the path at the current depth + if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)){ + //Launch a new requests for every matched directory + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, true); //true because searching for the path + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + } + } + else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){ + //We are searching for the name now and maxdepth has not been reached + for (StatInfo const& si : stat_infos) { + //Launch a new request for every directory + if(si.file_type == StatInfo::IS_DIR) { + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, false); //false because searching for the name + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + //All names that match the specified name are saved to outputs + if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){ + outputs.push_back(si); + } + } + } + } + //This section needs a lock to make sure we return the final chunk only once + //and no results are sent after aborted is set + std::lock_guard<std::mutex> find_lock(shared_state->lock); + //Decrement the counter once since we are done with this chunk + shared_state->outstanding_requests--; + if(shared_state->outstanding_requests == 0){ + //Send the outputs back to the user and notify that this is the final chunk + shared_state->handler(stat, outputs, false); + } else { + //There will be more results and we are not aborting + if (outputs.size() > 0 && !shared_state->aborted){ + //Send the outputs back to the user and notify that there is more + bool user_wants_more = shared_state->handler(stat, outputs, true); + if(!user_wants_more) { + //Abort if user doesn't want more + shared_state->aborted = true; + } + } + } +} + +void FileSystemImpl::Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + //Populating the operational state, which includes: + //current search path, depth within the path, and the indication that we are currently searching for a path (not name yet). + std::shared_ptr<FindOperationalState> operational_state = std::make_shared<FindOperationalState>(path, 0, true); + //Populating the shared state, which includes: + //vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag. + std::shared_ptr<FindSharedState> shared_state = std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false); + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more) { + FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state); + }; + nn_.GetListing("/", callback); +} + + +void FileSystemImpl::CreateSnapshot(const std::string &path, + const std::string &name, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty")); + return; + } + + nn_.CreateSnapshot(path, name, handler); +} + + +void FileSystemImpl::DeleteSnapshot(const std::string &path, + const std::string &name, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty")); + return; + } + if (name.empty()) { + handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty")); + return; + } + + nn_.DeleteSnapshot(path, name, handler); +} + +void FileSystemImpl::RenameSnapshot(const std::string &path, + const std::string &old_name, const std::string &new_name, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << + ", old_name=" << old_name << ", new_name=" << new_name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty")); + return; + } + if (old_name.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty")); + return; + } + if (new_name.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty")); + return; + } + + nn_.RenameSnapshot(path, old_name, new_name, handler); +} + +void FileSystemImpl::AllowSnapshot(const std::string &path, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty")); + return; + } + + nn_.AllowSnapshot(path, handler); +} + + +void FileSystemImpl::DisallowSnapshot(const std::string &path, + const std::function<void(const Status &)> &handler) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty")); + return; + } + + nn_.DisallowSnapshot(path, handler); +} + +void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) { + if (event_handlers_) { + event_handlers_->set_fs_callback(callback); + nn_.SetFsEventCallback(callback); + } +} + + + +std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() { + return event_handlers_; +} + +Options FileSystemImpl::get_options() { + return options_; +} + +std::string FileSystemImpl::get_cluster_name() { + return cluster_name_; +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h new file mode 100644 index 0000000..f2e9abd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ +#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ + +#include "filehandle.h" +#include "hdfspp/hdfspp.h" +#include "fs/bad_datanode_tracker.h" +#include "reader/block_reader.h" +#include "reader/fileinfo.h" + +#include "asio.hpp" + +#include <thread> +#include "namenode_operations.h" + +namespace hdfs { + +/* + * FileSystem: The consumer's main point of interaction with the cluster as + * a whole. + * + * Initially constructed in a disconnected state; call Connect before operating + * on the FileSystem. + * + * All open files must be closed before the FileSystem is destroyed. + * + * Threading model: thread-safe for all operations + * Lifetime: pointer created for consumer who is responsible for deleting it + */ +class FileSystemImpl : public FileSystem { +public: + MEMCHECKED_CLASS(FileSystemImpl) + typedef std::function<void(const Status &, FileSystem *)> ConnectCallback; + + explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); + explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); + ~FileSystemImpl() override; + + /* attempt to connect to namenode, return bad status on failure */ + void Connect(const std::string &server, const std::string &service, + const std::function<void(const Status &, FileSystem *)> &handler) override; + /* attempt to connect to namenode, return bad status on failure */ + Status Connect(const std::string &server, const std::string &service) override; + + /* Connect to the NN indicated in options.defaultFs */ + virtual void ConnectToDefaultFs( + const std::function<void(const Status &, FileSystem *)> &handler) override; + virtual Status ConnectToDefaultFs() override; + + /* Cancel connection if FS is in the middle of one */ + virtual bool CancelPendingConnect() override; + + virtual void Open(const std::string &path, + const std::function<void(const Status &, FileHandle *)> + &handler) override; + Status Open(const std::string &path, FileHandle **handle) override; + + virtual void GetPreferredBlockSize(const std::string &path, + const std::function<void(const Status &, const uint64_t &)> &handler) override; + virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override; + + virtual void SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) override; + virtual Status SetReplication(const std::string & path, int16_t replication) override; + + void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) override; + Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override; + + void GetFileInfo( + const std::string &path, + const std::function<void(const Status &, const StatInfo &)> &handler) override; + + Status GetFileInfo(const std::string &path, StatInfo & stat_info) override; + + void GetContentSummary(const std::string &path, + const std::function<void(const Status &, const ContentSummary &)> &handler) override; + Status GetContentSummary(const std::string &path, ContentSummary & stat_info) override; + + /** + * Retrieves the file system information such as the total raw size of all files in the filesystem + * and the raw capacity of the filesystem + * + * @param FsInfo struct to be populated by GetFsStats + **/ + void GetFsStats( + const std::function<void(const Status &, const FsInfo &)> &handler) override; + + Status GetFsStats(FsInfo & fs_info) override; + + void GetListing( + const std::string &path, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override; + + Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) override; + + virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override; + virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + std::shared_ptr<FileBlockLocation> * locations) override; + + virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, + std::function<void(const Status &)> handler) override; + virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override; + + virtual void Delete(const std::string &path, bool recursive, + const std::function<void(const Status &)> &handler) override; + virtual Status Delete(const std::string &path, bool recursive) override; + + virtual void Rename(const std::string &oldPath, const std::string &newPath, + const std::function<void(const Status &)> &handler) override; + virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; + + virtual void SetPermission(const std::string & path, uint16_t permissions, + const std::function<void(const Status &)> &handler) override; + virtual Status SetPermission(const std::string & path, uint16_t permissions) override; + + virtual void SetOwner(const std::string & path, const std::string & username, + const std::string & groupname, const std::function<void(const Status &)> &handler) override; + virtual Status SetOwner(const std::string & path, + const std::string & username, const std::string & groupname) override; + + void Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override; + Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) override; + + /***************************************************************************** + * FILE SYSTEM SNAPSHOT FUNCTIONS + ****************************************************************************/ + + /** + * Creates a snapshot of a snapshottable directory specified by path + * + * @param path Path to the directory to be snapshotted (must be non-empty) + * @param name Name to be given to the created snapshot (may be empty) + **/ + void CreateSnapshot(const std::string &path, const std::string &name, + const std::function<void(const Status &)> &handler) override; + Status CreateSnapshot(const std::string &path, const std::string &name) override; + + /** + * Deletes the directory snapshot specified by path and name + * + * @param path Path to the snapshotted directory (must be non-empty) + * @param name Name of the snapshot to be deleted (must be non-empty) + **/ + void DeleteSnapshot(const std::string &path, const std::string &name, + const std::function<void(const Status &)> &handler) override; + Status DeleteSnapshot(const std::string &path, const std::string &name) override; + + /** + * Renames the directory snapshot specified by path from old_name to new_name + * + * @param path Path to the snapshotted directory (must be non-blank) + * @param old_name Current name of the snapshot (must be non-blank) + * @param new_name New name of the snapshot (must be non-blank) + **/ + void RenameSnapshot(const std::string &path, const std::string &old_name, + const std::string &new_name, const std::function<void(const Status &)> &handler) override; + Status RenameSnapshot(const std::string &path, const std::string &old_name, + const std::string &new_name) override; + + /** + * Allows snapshots to be made on the specified directory + * + * @param path Path to the directory to be made snapshottable (must be non-empty) + **/ + void AllowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override; + Status AllowSnapshot(const std::string &path) override; + + /** + * Disallows snapshots to be made on the specified directory + * + * @param path Path to the directory to be made non-snapshottable (must be non-empty) + **/ + void DisallowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override; + Status DisallowSnapshot(const std::string &path) override; + + void SetFsEventCallback(fs_event_callback callback) override; + + /* add a new thread to handle asio requests, return number of threads in pool + */ + int AddWorkerThread(); + + /* how many worker threads are servicing asio requests */ + int WorkerThreadCount(); + + /* all monitored events will need to lookup handlers */ + std::shared_ptr<LibhdfsEvents> get_event_handlers(); + + Options get_options() override; + + std::string get_cluster_name() override; + +private: + /** + * The IoService must be the first member variable to ensure that it gets + * destroyed last. This allows other members to dequeue things from the + * service in their own destructors. + * A side effect of this is that requests may outlive the RpcEngine they + * reference. + **/ + std::shared_ptr<IoServiceImpl> io_service_; + const Options options_; + const std::string client_name_; + std::string cluster_name_; + NameNodeOperations nn_; + std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; + + // Keep connect callback around in case it needs to be canceled + SwappableCallbackHolder<ConnectCallback> connect_callback_; + + /** + * Runtime event monitoring handlers. + * Note: This is really handy to have for advanced usage but + * exposes implementation details that may change at any time. + **/ + std::shared_ptr<LibhdfsEvents> event_handlers_; + + void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, + std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler); + + struct FindSharedState { + //Name pattern (can have wild-cards) to find + const std::string name; + //Maximum depth to recurse after the end of path is reached. + //Can be set to 0 for pure path globbing and ignoring name pattern entirely. + const uint32_t maxdepth; + //Vector of all sub-directories from the path argument (each can have wild-cards) + std::vector<std::string> dirs; + //Callback from Find + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler; + //outstanding_requests is incremented once for every GetListing call. + std::atomic<uint64_t> outstanding_requests; + //Boolean needed to abort all recursion on error or on user command + std::atomic<bool> aborted; + //Shared variables will need protection with a lock + std::mutex lock; + FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_, + uint64_t outstanding_recuests_, bool aborted_) + : name(name_), + maxdepth(maxdepth_), + handler(handler_), + outstanding_requests(outstanding_recuests_), + aborted(aborted_), + lock() { + //Constructing the list of sub-directories + std::stringstream ss(path_); + if(path_.back() != '/'){ + ss << "/"; + } + for (std::string token; std::getline(ss, token, '/'); ) { + dirs.push_back(token); + } + } + }; + + struct FindOperationalState { + const std::string path; + const uint32_t depth; + const bool search_path; + FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) + : path(path_), + depth(depth_), + search_path(search_path_) { + } + }; + + void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, + bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state); + +}; +} + +#endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org