http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc new file mode 100644 index 0000000..53c9e26 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +#include <future> +#include <tuple> + +#define FMT_THIS_ADDR "this=" << (void*)this + +// Note: This is just a place to hold boilerplate async to sync shim code, +// place actual filesystem logic in filesystem.cc +// +// +// Shim pattern pseudocode +// +// Status MySynchronizedMethod(method_args): +// let stat = a promise<Status> wrapped in a shared_ptr +// +// Create a lambda that captures stat and any other variables that need to +// be set based on the async operation. When invoked set variables with the +// arguments passed (possibly do some translation), then set stat to indicate +// the return status of the async call. +// +// invoke MyAsyncMethod(method_args, handler_lambda) +// +// block until stat value has been set while async work takes place +// +// return stat + +namespace hdfs { + +Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR + << ", server=" << server << ", service=" << service << ") called"); + + /* synchronized */ + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future = stat->get_future(); + + auto callback = [stat](const Status &s, FileSystem *fs) { + (void)fs; + stat->set_value(s); + }; + + Connect(server, service, callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +Status FileSystemImpl::ConnectToDefaultFs() { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future = stat->get_future(); + + auto callback = [stat](const Status &s, FileSystem *fs) { + (void)fs; + stat->set_value(s); + }; + + ConnectToDefaultFs(callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +Status FileSystemImpl::Open(const std::string &path, + FileHandle **handle) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); + std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); + + /* wrap async FileSystem::Open with promise to make it a blocking call */ + auto h = [callstate](const Status &s, FileHandle *is) { + callstate->set_value(std::make_tuple(s, is)); + }; + + Open(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + FileHandle *file_handle = std::get<1>(returnstate); + + if (!stat.ok()) { + delete file_handle; + return stat; + } + if (!file_handle) { + return stat; + } + + *handle = file_handle; + return stat; +} + +Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + std::shared_ptr<FileBlockLocation> * fileBlockLocations) +{ + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + if (!fileBlockLocations) + return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>(); + std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future()); + + /* wrap async call with promise/future to make it blocking */ + auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) { + callstate->set_value(std::make_tuple(s,blockInfo)); + }; + + GetBlockLocations(path, offset, length, callback); + + /* wait for async to finish */ + auto returnstate = future.get(); + auto stat = std::get<0>(returnstate); + + if (!stat.ok()) { + return stat; + } + + *fileBlockLocations = std::get<1>(returnstate); + + return stat; +} + +Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>(); + std::future<std::tuple<Status, uint64_t>> future(callstate->get_future()); + + /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const uint64_t & bsize) { + callstate->set_value(std::make_tuple(s, bsize)); + }; + + GetPreferredBlockSize(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + uint64_t size = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + block_size = size; + return stat; +} + +Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetReplication with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetReplication(path, replication, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetTimes with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetTimes(path, mtime, atime, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::GetFileInfo(const std::string &path, + StatInfo & stat_info) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>(); + std::future<std::tuple<Status, StatInfo>> future(callstate->get_future()); + + /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const StatInfo &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetFileInfo(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + StatInfo info = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + stat_info = info; + return stat; +} + +Status FileSystemImpl::GetContentSummary(const std::string &path, + ContentSummary & content_summary) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetContentSummary(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, ContentSummary>>>(); + std::future<std::tuple<Status, ContentSummary>> future(callstate->get_future()); + + /* wrap async FileSystem::GetContentSummary with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const ContentSummary &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetContentSummary(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + ContentSummary cs = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + content_summary = cs; + return stat; +} + +Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called"); + + auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>(); + std::future<std::tuple<Status, FsInfo>> future(callstate->get_future()); + + /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const FsInfo &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetFsStats(h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + FsInfo info = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + fs_info = info; + return stat; +} + +Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); + } + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::GetListing with promise to make it a blocking call. + * + Keep requesting more until we get the entire listing, and don't set the promise + * until we have the entire listing. + */ + auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + + bool done = !s.ok() || !has_more; + if (done) { + callstate->set_value(s); + return false; + } + return true; + }; + + GetListing(path, h); + + /* block until promise is set */ + Status stat = future.get(); + + return stat; +} + +Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << + ", permissions=" << permissions << ", createparent=" << createparent << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Mkdirs(path, permissions, createparent, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::Delete(const std::string &path, bool recursive) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Delete with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Delete(path, recursive, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::Rename with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + Rename(oldPath, newPath, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetPermission with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetPermission(path, permissions, h); + + /* block until promise is set */ + Status stat = future.get(); + + return stat; +} + +Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username, + const std::string & groupname) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::SetOwner with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + SetOwner(path, username, groupname, h); + + /* block until promise is set */ + Status stat = future.get(); + return stat; +} + +Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); + } + + // In this case, we're going to have the async code populate stat_infos. + + std::promise<void> promise = std::promise<void>(); + std::future<void> future(promise.get_future()); + Status status = Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + if (!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); + return false; + } + return true; + }; + + Find(path, name, maxdepth, h); + + /* block until promise is set */ + future.get(); + return status; +} + +Status FileSystemImpl::CreateSnapshot(const std::string &path, + const std::string &name) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + CreateSnapshot(path, name, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::DeleteSnapshot(const std::string &path, + const std::string &name) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + DeleteSnapshot(path, name, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::RenameSnapshot(const std::string &path, + const std::string &old_name, const std::string &new_name) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << + ", old_name=" << old_name << ", new_name=" << new_name << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::RenameSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + RenameSnapshot(path, old_name, new_name, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::AllowSnapshot(const std::string &path) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + AllowSnapshot(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +Status FileSystemImpl::DisallowSnapshot(const std::string &path) { + LOG_DEBUG(kFileSystem, + << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + auto callstate = std::make_shared<std::promise<Status>>(); + std::future<Status> future(callstate->get_future()); + + /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */ + auto h = [callstate](const Status &s) { + callstate->set_value(s); + }; + + DisallowSnapshot(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = returnstate; + + return stat; +} + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc new file mode 100644 index 0000000..e46faad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" +#include "common/continuation/asio.h" + +#include <asio/ip/tcp.hpp> + +#include <functional> +#include <limits> +#include <future> +#include <tuple> +#include <iostream> +#include <pwd.h> +#include <utility> + +#define FMT_THIS_ADDR "this=" << (void*)this + +using ::asio::ip::tcp; + +namespace hdfs { + +/***************************************************************************** + * NAMENODE OPERATIONS + ****************************************************************************/ + +void NameNodeOperations::Connect(const std::string &cluster_name, + const std::vector<ResolvedNamenodeInfo> &servers, + std::function<void(const Status &)> &&handler) { + engine_->Connect(cluster_name, servers, handler); +} + +bool NameNodeOperations::CancelPendingConnect() { + return engine_->CancelPendingConnect(); +} + +void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) +{ + using ::hadoop::hdfs::GetBlockLocationsRequestProto; + using ::hadoop::hdfs::GetBlockLocationsResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); + + if (path.empty()) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot be empty"), nullptr); + return; + } + + //Protobuf gives an error 'Negative value is not supported' + //if the high bit is set in uint64 in GetBlockLocations + if (IsHighBitSet(offset)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr); + return; + } + if (IsHighBitSet(length)) { + handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr); + return; + } + + GetBlockLocationsRequestProto req; + req.set_src(path); + req.set_offset(offset); + req.set_length(length); + + auto resp = std::make_shared<GetBlockLocationsResponseProto>(); + + namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) { + if (stat.ok()) { + auto file_info = std::make_shared<struct FileInfo>(); + auto locations = resp->locations(); + + file_info->file_length_ = locations.filelength(); + file_info->last_block_complete_ = locations.islastblockcomplete(); + file_info->under_construction_ = locations.underconstruction(); + + for (const auto &block : locations.blocks()) { + file_info->blocks_.push_back(block); + } + + if (!locations.islastblockcomplete() && + locations.has_lastblock() && locations.lastblock().b().numbytes()) { + file_info->blocks_.push_back(locations.lastblock()); + file_info->file_length_ += locations.lastblock().b().numbytes(); + } + + handler(stat, file_info); + } else { + handler(stat, nullptr); + } + }); +} + +void NameNodeOperations::GetPreferredBlockSize(const std::string & path, + std::function<void(const Status &, const uint64_t)> handler) +{ + using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto; + using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' cannot be empty"), -1); + return; + } + + GetPreferredBlockSizeRequestProto req; + req.set_filename(path); + + auto resp = std::make_shared<GetPreferredBlockSizeResponseProto>(); + + namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok() && resp -> has_bsize()) { + uint64_t block_size = resp -> bsize(); + handler(stat, block_size); + } else { + handler(stat, -1); + } + }); +} + +void NameNodeOperations::SetReplication(const std::string & path, int16_t replication, + std::function<void(const Status &)> handler) +{ + using ::hadoop::hdfs::SetReplicationRequestProto; + using ::hadoop::hdfs::SetReplicationResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" << path << + ", replication=" << replication << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); + return; + } + Status replStatus = FileSystemImpl::CheckValidReplication(replication); + if (!replStatus.ok()) { + handler(replStatus); + return; + } + SetReplicationRequestProto req; + req.set_src(path); + req.set_replication(replication); + + auto resp = std::make_shared<SetReplicationResponseProto>(); + + namenode_.SetReplication(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // Checking resp + if(resp -> has_result() && resp ->result() == 1) { + handler(stat); + } else { + //NameNode does not specify why there is no result, in my testing it was happening when the path is not found + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew); + } + } else { + handler(stat); + } + }); +} + +void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function<void(const Status &)> handler) +{ + using ::hadoop::hdfs::SetTimesRequestProto; + using ::hadoop::hdfs::SetTimesResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path << + ", mtime=" << mtime << ", atime=" << atime << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty")); + return; + } + + SetTimesRequestProto req; + req.set_src(path); + req.set_mtime(mtime); + req.set_atime(atime); + + auto resp = std::make_shared<SetTimesResponseProto>(); + + namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) { + handler(stat); + }); +} + + + +void NameNodeOperations::GetFileInfo(const std::string & path, + std::function<void(const Status &, const StatInfo &)> handler) +{ + using ::hadoop::hdfs::GetFileInfoRequestProto; + using ::hadoop::hdfs::GetFileInfoResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be empty"), StatInfo()); + return; + } + + GetFileInfoRequestProto req; + req.set_src(path); + + auto resp = std::make_shared<GetFileInfoResponseProto>(); + + namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // For non-existant files, the server will respond with an OK message but + // no fs in the protobuf. + if(resp -> has_fs()){ + struct StatInfo stat_info; + stat_info.path = path; + stat_info.full_path = path; + HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); + handler(stat, stat_info); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew, StatInfo()); + } + } else { + handler(stat, StatInfo()); + } + }); +} + +void NameNodeOperations::GetContentSummary(const std::string & path, + std::function<void(const Status &, const ContentSummary &)> handler) +{ + using ::hadoop::hdfs::GetContentSummaryRequestProto; + using ::hadoop::hdfs::GetContentSummaryResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetContentSummary(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("GetContentSummary: argument 'path' cannot be empty"), ContentSummary()); + return; + } + + GetContentSummaryRequestProto req; + req.set_path(path); + + auto resp = std::make_shared<GetContentSummaryResponseProto>(); + + namenode_.GetContentSummary(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // For non-existant files, the server will respond with an OK message but + // no summary in the protobuf. + if(resp -> has_summary()){ + struct ContentSummary content_summary; + content_summary.path = path; + ContentSummaryProtoToContentSummary(content_summary, resp->summary()); + handler(stat, content_summary); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew, ContentSummary()); + } + } else { + handler(stat, ContentSummary()); + } + }); +} + +void NameNodeOperations::GetFsStats( + std::function<void(const Status &, const FsInfo &)> handler) { + using ::hadoop::hdfs::GetFsStatusRequestProto; + using ::hadoop::hdfs::GetFsStatsResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called"); + + GetFsStatusRequestProto req; + auto resp = std::make_shared<GetFsStatsResponseProto>(); + + namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) { + if (stat.ok()) { + struct FsInfo fs_info; + GetFsStatsResponseProtoToFsInfo(fs_info, resp); + handler(stat, fs_info); + } else { + handler(stat, FsInfo()); + } + }); +} + +void NameNodeOperations::GetListing( + const std::string & path, + std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, + const std::string & start_after) { + using ::hadoop::hdfs::GetListingRequestProto; + using ::hadoop::hdfs::GetListingResponseProto; + + LOG_TRACE( + kFileSystem, + << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + std::vector<StatInfo> empty; + handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false); + return; + } + + GetListingRequestProto req; + req.set_src(path); + req.set_startafter(start_after.c_str()); + req.set_needlocation(false); + + auto resp = std::make_shared<GetListingResponseProto>(); + + namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) { + std::vector<StatInfo> stat_infos; + if (stat.ok()) { + if(resp -> has_dirlist()){ + for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { + StatInfo si; + si.path = fs.path(); + si.full_path = path + fs.path(); + if(si.full_path.back() != '/'){ + si.full_path += "/"; + } + HdfsFileStatusProtoToStatInfo(si, fs); + stat_infos.push_back(si); + } + handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); + } else { + std::string errormsg = "No such file or directory: " + path; + handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false); + } + } else { + handler(stat, stat_infos, false); + } + }); +} + +void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, + std::function<void(const Status &)> handler) +{ + using ::hadoop::hdfs::MkdirsRequestProto; + using ::hadoop::hdfs::MkdirsResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << + ", permissions=" << permissions << ", createparent=" << createparent << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty")); + return; + } + + MkdirsRequestProto req; + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } + req.set_src(path); + hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked(); + perm->set_perm(permissions); + req.set_createparent(createparent); + + auto resp = std::make_shared<MkdirsResponseProto>(); + + namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // Checking resp + if(resp -> has_result() && resp ->result() == 1) { + handler(stat); + } else { + //NameNode does not specify why there is no result, in my testing it was happening when the path is not found + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew); + } + } else { + handler(stat); + } + }); +} + +void NameNodeOperations::Delete(const std::string & path, bool recursive, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::DeleteRequestProto; + using ::hadoop::hdfs::DeleteResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty")); + return; + } + + DeleteRequestProto req; + req.set_src(path); + req.set_recursive(recursive); + + auto resp = std::make_shared<DeleteResponseProto>(); + + namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // Checking resp + if(resp -> has_result() && resp ->result() == 1) { + handler(stat); + } else { + //NameNode does not specify why there is no result, in my testing it was happening when the path is not found + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew); + } + } else { + handler(stat); + } + }); +} + +void NameNodeOperations::Rename(const std::string & oldPath, const std::string & newPath, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::RenameRequestProto; + using ::hadoop::hdfs::RenameResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called"); + + if (oldPath.empty()) { + handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty")); + return; + } + + if (newPath.empty()) { + handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty")); + return; + } + + RenameRequestProto req; + req.set_src(oldPath); + req.set_dst(newPath); + + auto resp = std::make_shared<RenameResponseProto>(); + + namenode_.Rename(&req, resp, [resp, handler](const Status &stat) { + if (stat.ok()) { + // Checking resp + if(resp -> has_result() && resp ->result() == 1) { + handler(stat); + } else { + //Since NameNode does not specify why the result is not success, we set the general error + std::string errormsg = "oldPath and parent directory of newPath must exist. newPath must not exist."; + Status statNew = Status::InvalidArgument(errormsg.c_str()); + handler(statNew); + } + } else { + handler(stat); + } + }); +} + +void NameNodeOperations::SetPermission(const std::string & path, + uint16_t permissions, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::SetPermissionRequestProto; + using ::hadoop::hdfs::SetPermissionResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); + return; + } + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); + if (!permStatus.ok()) { + handler(permStatus); + return; + } + + SetPermissionRequestProto req; + req.set_src(path); + + hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission(); + perm->set_perm(permissions); + + auto resp = std::make_shared<SetPermissionResponseProto>(); + + namenode_.SetPermission(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::SetOwner(const std::string & path, + const std::string & username, const std::string & groupname, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::SetOwnerRequestProto; + using ::hadoop::hdfs::SetOwnerResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty")); + return; + } + + SetOwnerRequestProto req; + req.set_src(path); + + if(!username.empty()) { + req.set_username(username); + } + if(!groupname.empty()) { + req.set_groupname(groupname); + } + + auto resp = std::make_shared<SetOwnerResponseProto>(); + + namenode_.SetOwner(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::CreateSnapshot(const std::string & path, + const std::string & name, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::CreateSnapshotRequestProto; + using ::hadoop::hdfs::CreateSnapshotResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty")); + return; + } + + CreateSnapshotRequestProto req; + req.set_snapshotroot(path); + if (!name.empty()) { + req.set_snapshotname(name); + } + + auto resp = std::make_shared<CreateSnapshotResponseProto>(); + + namenode_.CreateSnapshot(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::DeleteSnapshot(const std::string & path, + const std::string & name, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::DeleteSnapshotRequestProto; + using ::hadoop::hdfs::DeleteSnapshotResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty")); + return; + } + if (name.empty()) { + handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty")); + return; + } + + DeleteSnapshotRequestProto req; + req.set_snapshotroot(path); + req.set_snapshotname(name); + + auto resp = std::make_shared<DeleteSnapshotResponseProto>(); + + namenode_.DeleteSnapshot(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::RenameSnapshot(const std::string & path, const std::string & old_name, + const std::string & new_name, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::RenameSnapshotRequestProto; + using ::hadoop::hdfs::RenameSnapshotResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path << + ", old_name=" << old_name << ", new_name=" << new_name << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty")); + return; + } + if (old_name.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty")); + return; + } + if (new_name.empty()) { + handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty")); + return; + } + + RenameSnapshotRequestProto req; + req.set_snapshotroot(path); + req.set_snapshotoldname(old_name); + req.set_snapshotnewname(new_name); + + auto resp = std::make_shared<RenameSnapshotResponseProto>(); + + namenode_.RenameSnapshot(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::AllowSnapshotRequestProto; + using ::hadoop::hdfs::AllowSnapshotResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty")); + return; + } + + AllowSnapshotRequestProto req; + req.set_snapshotroot(path); + + auto resp = std::make_shared<AllowSnapshotResponseProto>(); + + namenode_.AllowSnapshot(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::DisallowSnapshot(const std::string & path, std::function<void(const Status &)> handler) { + using ::hadoop::hdfs::DisallowSnapshotRequestProto; + using ::hadoop::hdfs::DisallowSnapshotResponseProto; + + LOG_TRACE(kFileSystem, + << "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + if (path.empty()) { + handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty")); + return; + } + + DisallowSnapshotRequestProto req; + req.set_snapshotroot(path); + + auto resp = std::make_shared<DisallowSnapshotResponseProto>(); + + namenode_.DisallowSnapshot(&req, resp, + [handler](const Status &stat) { + handler(stat); + }); +} + +void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { + engine_->SetFsEventCallback(callback); +} + +void NameNodeOperations::HdfsFileStatusProtoToStatInfo( + hdfs::StatInfo & stat_info, + const ::hadoop::hdfs::HdfsFileStatusProto & fs) { + stat_info.file_type = fs.filetype(); + stat_info.length = fs.length(); + stat_info.permissions = fs.permission().perm(); + stat_info.owner = fs.owner(); + stat_info.group = fs.group(); + stat_info.modification_time = fs.modification_time(); + stat_info.access_time = fs.access_time(); + stat_info.symlink = fs.symlink(); + stat_info.block_replication = fs.block_replication(); + stat_info.blocksize = fs.blocksize(); + stat_info.fileid = fs.fileid(); + stat_info.children_num = fs.childrennum(); +} + +void NameNodeOperations::ContentSummaryProtoToContentSummary( + hdfs::ContentSummary & content_summary, + const ::hadoop::hdfs::ContentSummaryProto & csp) { + content_summary.length = csp.length(); + content_summary.filecount = csp.filecount(); + content_summary.directorycount = csp.directorycount(); + content_summary.quota = csp.quota(); + content_summary.spaceconsumed = csp.spaceconsumed(); + content_summary.spacequota = csp.spacequota(); +} + +void NameNodeOperations::GetFsStatsResponseProtoToFsInfo( + hdfs::FsInfo & fs_info, + const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) { + fs_info.capacity = fs->capacity(); + fs_info.used = fs->used(); + fs_info.remaining = fs->remaining(); + fs_info.under_replicated = fs->under_replicated(); + fs_info.corrupt_blocks = fs->corrupt_blocks(); + fs_info.missing_blocks = fs->missing_blocks(); + fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks(); + if(fs->has_blocks_in_future()){ + fs_info.blocks_in_future = fs->blocks_in_future(); + } +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h new file mode 100644 index 0000000..f4caa18 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ +#define LIBHDFSPP_LIB_FS_NAMENODEOPERATIONS_H_ + +#include "rpc/rpc_engine.h" +#include "hdfspp/statinfo.h" +#include "hdfspp/fsinfo.h" +#include "hdfspp/content_summary.h" +#include "common/namenode_info.h" +#include "ClientNamenodeProtocol.pb.h" +#include "ClientNamenodeProtocol.hrpc.inl" + + +namespace hdfs { + +/** +* NameNodeConnection: abstracts the details of communicating with a NameNode +* and the implementation of the communications protocol. +* +* Will eventually handle retry and failover. +* +* Threading model: thread-safe; all operations can be called concurrently +* Lifetime: owned by a FileSystemImpl +*/ + +class NameNodeOperations { +public: + MEMCHECKED_CLASS(NameNodeOperations) + NameNodeOperations(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const std::string &user_name, + const char *protocol_name, int protocol_version) : + io_service_(io_service), + engine_(std::make_shared<RpcEngine>(io_service, options, client_name, user_name, protocol_name, protocol_version)), + namenode_(engine_), options_(options) {} + + + void Connect(const std::string &cluster_name, + const std::vector<ResolvedNamenodeInfo> &servers, + std::function<void(const Status &)> &&handler); + + bool CancelPendingConnect(); + + void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, + std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); + + void GetPreferredBlockSize(const std::string & path, + std::function<void(const Status &, const uint64_t)> handler); + + void SetReplication(const std::string & path, int16_t replication, + std::function<void(const Status &)> handler); + + void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, + std::function<void(const Status &)> handler); + + void GetFileInfo(const std::string & path, + std::function<void(const Status &, const StatInfo &)> handler); + + void GetContentSummary(const std::string & path, + std::function<void(const Status &, const ContentSummary &)> handler); + + void GetFsStats(std::function<void(const Status &, const FsInfo &)> handler); + + // start_after="" for initial call + void GetListing(const std::string & path, + std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, + const std::string & start_after = ""); + + void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, + std::function<void(const Status &)> handler); + + void Delete(const std::string & path, bool recursive, + std::function<void(const Status &)> handler); + + void Rename(const std::string & oldPath, const std::string & newPath, + std::function<void(const Status &)> handler); + + void SetPermission(const std::string & path, uint16_t permissions, + std::function<void(const Status &)> handler); + + void SetOwner(const std::string & path, const std::string & username, + const std::string & groupname, std::function<void(const Status &)> handler); + + void CreateSnapshot(const std::string & path, const std::string & name, + std::function<void(const Status &)> handler); + + void DeleteSnapshot(const std::string & path, const std::string & name, + std::function<void(const Status &)> handler); + + void RenameSnapshot(const std::string & path, const std::string & old_name, const std::string & new_name, + std::function<void(const Status &)> handler); + + void AllowSnapshot(const std::string & path, + std::function<void(const Status &)> handler); + + void DisallowSnapshot(const std::string & path, + std::function<void(const Status &)> handler); + + void SetFsEventCallback(fs_event_callback callback); + +private: + static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs); + static void ContentSummaryProtoToContentSummary(hdfs::ContentSummary & content_summary, const ::hadoop::hdfs::ContentSummaryProto & csp); + static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); + static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs); + + ::asio::io_service * io_service_; + + // This is the only permanent owner of the RpcEngine, however the RPC layer + // needs to reference count it prevent races during FileSystem destruction. + // In order to do this they hold weak_ptrs and promote them to shared_ptr + // when calling non-blocking RpcEngine methods e.g. get_client_id(). + std::shared_ptr<RpcEngine> engine_; + + // Automatically generated methods for RPC calls. See protoc_gen_hrpc.cc + ClientNamenodeProtocol namenode_; + const Options options_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt new file mode 100644 index 0000000..2eff301 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR}) + +protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto + ${PROTO_HDFS_DIR}/acl.proto + ${PROTO_HDFS_DIR}/datatransfer.proto + ${PROTO_HDFS_DIR}/encryption.proto + ${PROTO_HDFS_DIR}/erasurecoding.proto + ${PROTO_HDFS_DIR}/hdfs.proto + ${PROTO_HDFS_DIR}/inotify.proto + ${PROTO_HDFS_DIR}/xattr.proto + ${PROTO_HDFS_DIR}/ReconfigurationProtocol.proto + ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto + ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto + ${PROTO_HADOOP_DIR}/RpcHeader.proto + ${PROTO_HADOOP_DIR}/Security.proto +) + +add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc) +target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY}) + +function(GEN_HRPC SRCS) + if(NOT ARGN) + message(SEND_ERROR "Error: GEN_HRPC() called without any proto files") + return() + endif() + + if(DEFINED PROTOBUF_IMPORT_DIRS) + foreach(DIR ${PROTOBUF_IMPORT_DIRS}) + get_filename_component(ABS_PATH ${DIR} ABSOLUTE) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + endif() + + set(${SRCS}) + + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(FIL_WE ${FIL} NAME_WE) + + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl") + + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl" + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc + COMMENT "Running HRPC protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + + set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE) + set(${SRCS} ${${SRCS}} PARENT_SCOPE) +endfunction() + +gen_hrpc(HRPC_SRCS + ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto +) + +add_library(proto_obj OBJECT ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS}) +if(HADOOP_BUILD) + add_dependencies(proto_obj copy_hadoop_files) +endif(HADOOP_BUILD) +add_library(proto $<TARGET_OBJECTS:proto_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc new file mode 100644 index 0000000..e7355c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "protobuf/cpp_helpers.h" + +#include <google/protobuf/compiler/code_generator.h> +#include <google/protobuf/compiler/plugin.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/io/printer.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/stubs/common.h> + +#include <memory> + +using ::google::protobuf::FileDescriptor; +using ::google::protobuf::MethodDescriptor; +using ::google::protobuf::ServiceDescriptor; +using ::google::protobuf::compiler::CodeGenerator; +using ::google::protobuf::compiler::GeneratorContext; +using ::google::protobuf::io::Printer; +using ::google::protobuf::io::ZeroCopyOutputStream; + +class StubGenerator : public CodeGenerator { +public: + virtual bool Generate(const FileDescriptor *file, const std::string &, + GeneratorContext *ctx, + std::string *error) const override; + +private: + void EmitService(const ServiceDescriptor *service, Printer *out) const; + void EmitMethod(const MethodDescriptor *method, Printer *out) const; +}; + +bool StubGenerator::Generate(const FileDescriptor *file, const std::string &, + GeneratorContext *ctx, std::string *) const { + namespace pb = ::google::protobuf; + std::unique_ptr<ZeroCopyOutputStream> os( + ctx->Open(StripProto(file->name()) + ".hrpc.inl")); + Printer out(os.get(), '$'); + for (int i = 0; i < file->service_count(); ++i) { + const ServiceDescriptor *service = file->service(i); + EmitService(service, &out); + } + return true; +} + +void StubGenerator::EmitService(const ServiceDescriptor *service, + Printer *out) const { + out->Print("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n" + "class $service$ {\n" + "private:\n" + " std::shared_ptr<::hdfs::RpcEngine> engine_;\n" + "public:\n" + " typedef std::function<void(const ::hdfs::Status &)> Callback;\n" + " typedef ::google::protobuf::MessageLite Message;\n" + " inline $service$(std::shared_ptr<::hdfs::RpcEngine> engine)\n" + " : engine_(engine) {}\n", + "service", service->name()); + for (int i = 0; i < service->method_count(); ++i) { + const MethodDescriptor *method = service->method(i); + EmitMethod(method, out); + } + out->Print("};\n"); +} + +void StubGenerator::EmitMethod(const MethodDescriptor *method, + Printer *out) const { + out->Print( + "\n inline void $camel_method$(const Message *req, " + "const std::shared_ptr<Message> &resp, " + "const Callback &handler) {\n" + " engine_->AsyncRpc(\"$method$\", req, resp, handler);\n" + " }\n", + "camel_method", ToCamelCase(method->name()), "method", method->name()); +} + +int main(int argc, char *argv[]) { + StubGenerator generator; + return google::protobuf::compiler::PluginMain(argc, argv, &generator); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt new file mode 100644 index 0000000..2bcfd92 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc) +add_dependencies(reader_obj proto) +add_library(reader $<TARGET_OBJECTS:reader_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc new file mode 100644 index 0000000..ca7715d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -0,0 +1,571 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "common/continuation/continuation.h" +#include "common/continuation/asio.h" +#include "common/logging.h" +#include "common/util.h" + +#include <future> + +namespace hdfs { + +#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ +#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_ +#define FMT_THIS_ADDR "this=" << (void*)this + + +// Stuff an OpReadBlockProto message with required fields. +hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name, + bool verify_checksum, const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) +{ + using namespace hadoop::hdfs; + using namespace hadoop::common; + BaseHeaderProto *base_h = new BaseHeaderProto(); + base_h->set_allocated_block(new ExtendedBlockProto(*block)); + if (token) { + base_h->set_allocated_token(new TokenProto(*token)); + } + ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); + h->set_clientname(client_name); + h->set_allocated_baseheader(base_h); + + OpReadBlockProto p; + p.set_allocated_header(h); + p.set_offset(offset); + p.set_len(length); + p.set_sendchecksums(verify_checksum); + // TODO: p.set_allocated_cachingstrategy(); + return p; +} + +// +// Notes about the BlockReader and associated object lifecycles (9/29/16) +// -We have a several stages in the read pipeline. Each stage represents a logical +// step in the HDFS block transfer logic. They are implemented as continuations +// for now, and in some cases the stage may have a nested continuation as well. +// It's important to make sure that continuations, nested or otherwise, cannot +// outlive the objects they depend on. +// +// -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each +// pipeline stage. The connection object must never be destroyed while operations are +// pending on the ASIO side (see HDFS-10931). In order to prevent a state where the +// BlockReader or one of the corresponding pipelines outlives the connection each +// pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_. +// + + +static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock}; + +void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset, const std::function<void(Status)> &handler) +{ + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock(" + << FMT_THIS_ADDR << ", ..., length=" + << length << ", offset=" << offset << ", ...) called"); + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + bytes_to_read_ = length; + + struct State { + std::string header; + hadoop::hdfs::OpReadBlockProto request; + hadoop::hdfs::BlockOpResponseProto response; + }; + + auto m = continuation::Pipeline<State>::Create(cancel_state_); + State *s = &m->state(); + + s->request = ReadBlockProto(client_name, options_.verify_checksum, + dn_->token_.get(), block, length, offset); + + s->header = std::string((const char*)unsecured_request_block_header, 3); + + bool serialize_success = true; + s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success); + + if(!serialize_success) { + handler(Status::Error("Unable to serialize protobuf message")); + return; + } + + auto read_pb_message = + new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(dn_, &s->response); + + m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))).Push(read_pb_message); + + m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; + if (stat.ok()) { + const auto &resp = s.response; + + if(this->event_handlers_) { + event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (stat.ok() && event_resp.response_type() == event_response::kTest_Error) { + stat = Status::Error("Test error"); + } +#endif + } + + if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + if (resp.has_readopchecksuminfo()) { + const auto &checksum_info = resp.readopchecksuminfo(); + chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); + } + state_ = kReadPacketHeader; + } else { + stat = Status::Error(s.response.message().c_str()); + } + } + handler(stat); + }); +} + +Status BlockReaderImpl::RequestBlock(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) +{ + LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock(" + << FMT_THIS_ADDR <<"..., length=" + << length << ", offset=" << offset << ") called"); + + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + AsyncRequestBlock(client_name, block, length, offset, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation +{ + ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + + parent_->packet_data_read_bytes_ = 0; + parent_->packet_len_ = 0; + auto handler = [next, this](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent_->packet_len_ = packet_length(); + parent_->header_.Clear(); + bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], + header_length()); + assert(v && "Failed to parse the header"); + (void)v; //avoids unused variable warning + parent_->state_ = kReadChecksum; + } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } + next(status); + }; + + asio::async_read(*parent_->dn_, asio::buffer(buf_), + std::bind(&ReadPacketHeader::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), handler); + } + +private: + static const size_t kMaxHeaderSize = 512; + static const size_t kPayloadLenOffset = 0; + static const size_t kPayloadLenSize = sizeof(int32_t); + static const size_t kHeaderLenOffset = 4; + static const size_t kHeaderLenSize = sizeof(int16_t); + static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; + + BlockReaderImpl *parent_; + std::array<char, kMaxHeaderSize> buf_; + + size_t packet_length() const { + return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); + } + + size_t header_length() const { + return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); + } + + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } else if (transferred < kHeaderStart) { + return kHeaderStart - transferred; + } else { + return kHeaderStart + header_length() - transferred; + } + } + + // Keep the DN connection alive + std::shared_ptr<DataNodeConnection> shared_conn_; +}; + +struct BlockReaderImpl::ReadChecksum : continuation::Continuation +{ + ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + + auto parent = parent_; + if (parent->state_ != kReadChecksum) { + next(Status::OK()); + return; + } + + std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; + + auto handler = [parent, next, this, keep_conn_alive_](const asio::error_code &ec, size_t) + { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData; + } + if(parent->event_handlers_) { + event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } + next(status); + }; + + parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen()); + + asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); + } + +private: + BlockReaderImpl *parent_; + + // Keep the DataNodeConnection alive + std::shared_ptr<DataNodeConnection> shared_conn_; +}; + +struct BlockReaderImpl::ReadData : continuation::Continuation +{ + ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred, + const asio::mutable_buffers_1 &buf) : parent_(parent), + bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_) + { + buf_.begin(); + } + + ~ReadData() { + buf_.end(); + } + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + auto handler = + [next, this](const asio::error_code &ec, size_t transferred) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } + + *bytes_transferred_ += transferred; + parent_->bytes_to_read_ -= transferred; + parent_->packet_data_read_bytes_ += transferred; + + if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { + parent_->state_ = kReadPacketHeader; + } + + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } + next(status); + }; + + auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_; + + asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), handler); + } + +private: + BlockReaderImpl *parent_; + std::shared_ptr<size_t> bytes_transferred_; + const asio::mutable_buffers_1 buf_; + + // Keep DNConnection alive. + std::shared_ptr<DataNodeConnection> shared_conn_; +}; + +struct BlockReaderImpl::ReadPadding : continuation::Continuation +{ + ReadPadding(BlockReaderImpl *parent) : parent_(parent), + padding_(parent->chunk_padding_bytes_), + bytes_transferred_(std::make_shared<size_t>(0)), + read_data_(new ReadData(parent, bytes_transferred_, asio::buffer(padding_))), + shared_conn_(parent->dn_) {} + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run(" + << FMT_CONT_AND_PARENT_ADDR << ") called"); + + if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { + next(Status::OK()); + return; + } + + std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; + + auto h = [next, this, keep_conn_alive_](const Status &stat) { + Status status = stat; + if (status.ok()) { + assert(reinterpret_cast<const int &>(*bytes_transferred_) == parent_->chunk_padding_bytes_); + parent_->chunk_padding_bytes_ = 0; + parent_->state_ = kReadData; + } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } + next(status); + }; + read_data_->Run(h); + } + +private: + BlockReaderImpl *parent_; + std::vector<char> padding_; + std::shared_ptr<size_t> bytes_transferred_; + std::shared_ptr<continuation::Continuation> read_data_; + ReadPadding(const ReadPadding &) = delete; + ReadPadding &operator=(const ReadPadding &) = delete; + + // Keep DNConnection alive. + std::shared_ptr<DataNodeConnection> shared_conn_; +}; + + +struct BlockReaderImpl::AckRead : continuation::Continuation +{ + AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {} + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called"); + + if (parent_->bytes_to_read_ > 0) { + next(Status::OK()); + return; + } + + auto m = continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_); + + m->state().set_status(parent_->options_.verify_checksum + ? hadoop::hdfs::Status::CHECKSUM_OK + : hadoop::hdfs::Status::SUCCESS); + + m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); + + std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_; + + m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &) + { + Status status = stat; + if (status.ok()) { + parent_->state_ = BlockReaderImpl::kFinished; + } + if(parent_->event_handlers_) { + event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (status.ok() && event_resp.response_type() == event_response::kTest_Error) { + status = Status::Error("Test error"); + } +#endif + } + next(status); + }); + } + +private: + BlockReaderImpl *parent_; + + // Keep DNConnection alive. + std::shared_ptr<DataNodeConnection> shared_conn_; +}; + +void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) +{ + assert(state_ != kOpen && "Not connected"); + + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called"); + + struct State { + std::shared_ptr<size_t> bytes_transferred; + }; + + auto m = continuation::Pipeline<State>::Create(cancel_state_); + m->state().bytes_transferred = std::make_shared<size_t>(0); + + // Note: some of these continuations have nested pipelines. + m->Push(new ReadPacketHeader(this)) + .Push(new ReadChecksum(this)) + .Push(new ReadPadding(this)) + .Push(new ReadData( + this, m->state().bytes_transferred, buffers)) + .Push(new AckRead(this)); + + auto self = this->shared_from_this(); + m->Run([self, handler](const Status &status, const State &state) { + handler(status, *state.bytes_transferred); + }); +} + + +size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) +{ + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); + + size_t transferred = 0; + auto done = std::make_shared<std::promise<void>>(); + auto future = done->get_future(); + AsyncReadPacket(buffers, + [status, &transferred, done](const Status &stat, size_t t) { + *status = stat; + transferred = t; + done->set_value(); + }); + future.wait(); + return transferred; +} + + +struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation +{ + RequestBlockContinuation(BlockReader *reader, const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), offset_(offset) + { + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run(" + << FMT_CONT_AND_READER_ADDR << ") called"); + + reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next); + } + +private: + BlockReader *reader_; + const std::string client_name_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation +{ + ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred) + : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} + + virtual void Run(const Next &next) override { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run(" + << FMT_CONT_AND_READER_ADDR << ") called"); + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + +private: + BlockReader *reader_; + const MutableBuffers buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function<void(const Status &)> next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->AsyncReadPacket( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + +void BlockReaderImpl::AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, + size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) +{ + LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" + << FMT_THIS_ADDR << ") called"); + + auto m = continuation::Pipeline<size_t>::Create(cancel_state_); + size_t * bytesTransferred = &m->state(); + + size_t size = asio::buffer_size(buffers); + + m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset)) + .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); + + m->Run([handler] (const Status &status, const size_t totalBytesTransferred) { + handler(status, totalBytesTransferred); + }); +} + +void BlockReaderImpl::CancelOperation() { + LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation(" + << FMT_THIS_ADDR << ") called"); + /* just forward cancel to DNConnection */ + dn_->Cancel(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h new file mode 100644 index 0000000..b5cbdf5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCK_READER_H_ +#define BLOCK_READER_H_ + +#include "hdfspp/status.h" +#include "common/async_stream.h" +#include "common/cancel_tracker.h" +#include "common/new_delete.h" +#include "datatransfer.pb.h" +#include "connection/datanodeconnection.h" + +#include <memory> + +namespace hdfs { + +struct CacheStrategy { + bool drop_behind_specified; + bool drop_behind; + bool read_ahead_specified; + unsigned long long read_ahead; + CacheStrategy() + : drop_behind_specified(false), drop_behind(false), + read_ahead_specified(false), read_ahead(false) {} +}; + +enum DropBehindStrategy { + kUnspecified = 0, + kEnableDropBehind = 1, + kDisableDropBehind = 2, +}; + +enum EncryptionScheme { + kNone = 0, + kAESCTRNoPadding = 1, +}; + +struct BlockReaderOptions { + bool verify_checksum; + CacheStrategy cache_strategy; + EncryptionScheme encryption_scheme; + + BlockReaderOptions() + : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} +}; + +/** + * Handles the operational state of request and reading a block (or portion of + * a block) from a DataNode. + * + * Threading model: not thread-safe. + * Lifecycle: should be created, used for a single read, then freed. + */ +class BlockReader { +public: + MEMCHECKED_CLASS(BlockReader) + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) = 0; + + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function<void(Status)> &handler) = 0; + + virtual void CancelOperation() = 0; +}; + +class BlockReaderImpl + : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> { +public: + explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn, + CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents> event_handlers=nullptr) + : dn_(dn), state_(kOpen), options_(options), + chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} + + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) override; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function<void(Status)> &handler) override; + + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) override; + + virtual void CancelOperation() override; + + size_t ReadPacket(const MutableBuffers &buffers, Status *status); + + Status RequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset); + +private: + struct RequestBlockContinuation; + struct ReadBlockContinuation; + + struct ReadPacketHeader; + struct ReadChecksum; + struct ReadPadding; + struct ReadData; + struct AckRead; + enum State { + kOpen, + kReadPacketHeader, + kReadChecksum, + kReadPadding, + kReadData, + kFinished, + }; + + std::shared_ptr<DataNodeConnection> dn_; + hadoop::hdfs::PacketHeaderProto header_; + State state_; + BlockReaderOptions options_; + size_t packet_len_; + int packet_data_read_bytes_; + int chunk_padding_bytes_; + long long bytes_to_read_; + std::vector<char> checksum_; + CancelHandle cancel_state_; + LibhdfsEvents* event_handlers_; +}; +} + +#endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org