http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp b/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp new file mode 100644 index 0000000..c0d0e10 --- /dev/null +++ b/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp @@ -0,0 +1,203 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "client/Token.h" +#include "datatransfer.pb.h" +#include "DataTransferProtocolSender.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "hdfs.pb.h" +#include "Security.pb.h" +#include "WriteBuffer.h" + +using namespace google::protobuf; + +namespace Hdfs { +namespace Internal { + +static inline void Send(Socket & sock, DataTransferOp op, Message * msg, + int writeTimeout) { + WriteBuffer buffer; + buffer.writeBigEndian(static_cast<int16_t>(DATA_TRANSFER_VERSION)); + buffer.write(static_cast<char>(op)); + int msgSize = msg->ByteSize(); + buffer.writeVarint32(msgSize); + char * b = buffer.alloc(msgSize); + + if (!msg->SerializeToArray(b, msgSize)) { + THROW(HdfsIOException, + "DataTransferProtocolSender cannot serialize header to send buffer."); + } + + sock.writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout); +} + +static inline void BuildBaseHeader(const ExtendedBlock & block, + const Token & accessToken, BaseHeaderProto * header) { + ExtendedBlockProto * eb = header->mutable_block(); + TokenProto * token = header->mutable_token(); + eb->set_blockid(block.getBlockId()); + eb->set_generationstamp(block.getGenerationStamp()); + eb->set_numbytes(block.getNumBytes()); + eb->set_poolid(block.getPoolId()); + token->set_identifier(accessToken.getIdentifier()); + token->set_password(accessToken.getPassword()); + token->set_kind(accessToken.getKind()); + token->set_service(accessToken.getService()); +} + +static inline void BuildClientHeader(const ExtendedBlock & block, + const Token & accessToken, const char * clientName, + ClientOperationHeaderProto * header) { + header->set_clientname(clientName); + BuildBaseHeader(block, accessToken, header->mutable_baseheader()); +} + +static inline void BuildNodeInfo(const DatanodeInfo & node, + DatanodeInfoProto * info) { + DatanodeIDProto * id = info->mutable_id(); + id->set_hostname(node.getHostName()); + id->set_infoport(node.getInfoPort()); + id->set_ipaddr(node.getIpAddr()); + id->set_ipcport(node.getIpcPort()); + id->set_datanodeuuid(node.getDatanodeId()); + id->set_xferport(node.getXferPort()); + info->set_location(node.getLocation()); +} + +static inline void BuildNodesInfo(const std::vector<DatanodeInfo> & nodes, + RepeatedPtrField<DatanodeInfoProto> * infos) { + for (std::size_t i = 0; i < nodes.size(); ++i) { + BuildNodeInfo(nodes[i], infos->Add()); + } +} + +DataTransferProtocolSender::DataTransferProtocolSender(Socket & sock, + int writeTimeout, const std::string & datanodeAddr) : + sock(sock), writeTimeout(writeTimeout), datanode(datanodeAddr) { +} + +DataTransferProtocolSender::~DataTransferProtocolSender() { +} + +void DataTransferProtocolSender::readBlock(const ExtendedBlock & blk, + const Token & blockToken, const char * clientName, + int64_t blockOffset, int64_t length) { + try { + OpReadBlockProto op; + op.set_len(length); + op.set_offset(blockOffset); + BuildClientHeader(blk, blockToken, clientName, op.mutable_header()); + Send(sock, READ_BLOCK, &op, writeTimeout); + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "DataTransferProtocolSender cannot send read request to datanode %s.", + datanode.c_str()); + } +} + +void DataTransferProtocolSender::writeBlock(const ExtendedBlock & blk, + const Token & blockToken, const char * clientName, + const std::vector<DatanodeInfo> & targets, int stage, int pipelineSize, + int64_t minBytesRcvd, int64_t maxBytesRcvd, + int64_t latestGenerationStamp, int checksumType, int bytesPerChecksum) { + try { + OpWriteBlockProto op; + op.set_latestgenerationstamp(latestGenerationStamp); + op.set_minbytesrcvd(minBytesRcvd); + op.set_maxbytesrcvd(maxBytesRcvd); + op.set_pipelinesize(targets.size()); + op.set_stage((OpWriteBlockProto_BlockConstructionStage) stage); + BuildClientHeader(blk, blockToken, clientName, op.mutable_header()); + ChecksumProto * ck = op.mutable_requestedchecksum(); + ck->set_bytesperchecksum(bytesPerChecksum); + ck->set_type((ChecksumTypeProto) checksumType); + BuildNodesInfo(targets, op.mutable_targets()); + Send(sock, WRITE_BLOCK, &op, writeTimeout); + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "DataTransferProtocolSender cannot send write request to datanode %s.", + datanode.c_str()); + } +} + +void DataTransferProtocolSender::transferBlock(const ExtendedBlock & blk, + const Token & blockToken, const char * clientName, + const std::vector<DatanodeInfo> & targets) { + try { + OpTransferBlockProto op; + BuildClientHeader(blk, blockToken, clientName, op.mutable_header()); + BuildNodesInfo(targets, op.mutable_targets()); + Send(sock, TRANSFER_BLOCK, &op, writeTimeout); + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "DataTransferProtocolSender cannot send transfer request to datanode %s.", + datanode.c_str()); + } +} + +void DataTransferProtocolSender::blockChecksum(const ExtendedBlock & blk, + const Token & blockToken) { + try { + //TODO + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "DataTransferProtocolSender cannot send checksum request to datanode %s.", + datanode.c_str()); + } +} + +void DataTransferProtocolSender::requestShortCircuitFds(const ExtendedBlock blk, + const Token& blockToken, + uint32_t maxVersion) { + try { + OpRequestShortCircuitAccessProto op; + BuildBaseHeader(blk, blockToken, op.mutable_header()); + op.set_maxversion(maxVersion); + + Send(sock, REQUEST_SHORT_CIRCUIT_FDS, &op, writeTimeout); + } catch (const HdfsCanceled& e) { + throw; + } catch (const HdfsException& e) { + NESTED_THROW(HdfsIOException, + "DataTransferProtocolSender cannot send request " + "short-circuit fds request " + "to datanode %s.", + datanode.c_str()); + } +} +} +} +
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DataTransferProtocolSender.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/DataTransferProtocolSender.h b/depends/libhdfs3/src/client/DataTransferProtocolSender.h new file mode 100644 index 0000000..63d1c36 --- /dev/null +++ b/depends/libhdfs3/src/client/DataTransferProtocolSender.h @@ -0,0 +1,143 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_ +#define _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_ + +#include "DataTransferProtocol.h" +#include "network/Socket.h" + +/** + * Version 28: + * Declare methods in DataTransferProtocol interface. + */ +#define DATA_TRANSFER_VERSION 28 + +namespace Hdfs { +namespace Internal { + +enum DataTransferOp { + WRITE_BLOCK = 80, + READ_BLOCK = 81, + READ_METADATA = 82, + REPLACE_BLOCK = 83, + COPY_BLOCK = 84, + BLOCK_CHECKSUM = 85, + TRANSFER_BLOCK = 86, + REQUEST_SHORT_CIRCUIT_FDS = 87, + RELEASE_SHORT_CIRCUIT_FDS = 88 +}; + +/** + * Transfer data to/from datanode using a streaming protocol. + */ +class DataTransferProtocolSender: public DataTransferProtocol { +public: + DataTransferProtocolSender(Socket & sock, int writeTimeout, + const std::string & datanodeAddr); + + virtual ~DataTransferProtocolSender(); + + /** + * Read a block. + * + * @param blk the block being read. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param blockOffset offset of the block. + * @param length maximum number of bytes for this read. + */ + virtual void readBlock(const ExtendedBlock & blk, const Token & blockToken, + const char * clientName, int64_t blockOffset, int64_t length); + + /** + * Write a block to a datanode pipeline. + * + * @param blk the block being written. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes in the pipeline. + * @param source source datanode. + * @param stage pipeline stage. + * @param pipelineSize the size of the pipeline. + * @param minBytesRcvd minimum number of bytes received. + * @param maxBytesRcvd maximum number of bytes received. + * @param latestGenerationStamp the latest generation stamp of the block. + */ + virtual void writeBlock(const ExtendedBlock & blk, const Token & blockToken, + const char * clientName, const std::vector<DatanodeInfo> & targets, + int stage, int pipelineSize, int64_t minBytesRcvd, + int64_t maxBytesRcvd, int64_t latestGenerationStamp, + int checksumType, int bytesPerChecksum); + + /** + * Transfer a block to another datanode. + * The block stage must be + * either {@link BlockConstructionStage#TRANSFER_RBW} + * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. + * + * @param blk the block being transferred. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes. + */ + virtual void transferBlock(const ExtendedBlock & blk, + const Token & blockToken, const char * clientName, + const std::vector<DatanodeInfo> & targets); + + /** + * Get block checksum (MD5 of CRC32). + * + * @param blk a block. + * @param blockToken security token for accessing the block. + * @throw HdfsIOException + */ + virtual void blockChecksum(const ExtendedBlock & blk, + const Token & blockToken); + + /** + * Request short circuit access file descriptors from a DataNode. + * + * @param blk The block to get file descriptors for. + * @param blockToken Security token for accessing the block. + * @param maxVersion Maximum version of the block data the client + * can understand. + */ + virtual void requestShortCircuitFds(const ExtendedBlock blk, + const Token& blockToken, + uint32_t maxVersion); + +private: + Socket & sock; + int writeTimeout; + std::string datanode; +}; + +} +} + +#endif /* _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DirectoryIterator.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/DirectoryIterator.cpp b/depends/libhdfs3/src/client/DirectoryIterator.cpp new file mode 100644 index 0000000..b5b5043 --- /dev/null +++ b/depends/libhdfs3/src/client/DirectoryIterator.cpp @@ -0,0 +1,100 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DirectoryIterator.h" +#include "FileStatus.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemImpl.h" + +namespace Hdfs { + +DirectoryIterator::DirectoryIterator() : + needLocations(false), filesystem(NULL), next(0) { +} + +DirectoryIterator::DirectoryIterator(Hdfs::Internal::FileSystemImpl * const fs, + std::string path, bool needLocations) : + needLocations(needLocations), filesystem(fs), next(0), path(path) { +} + +DirectoryIterator::DirectoryIterator(const DirectoryIterator & it) : + needLocations(it.needLocations), filesystem(it.filesystem), next(it.next), path(it.path), startAfter( + it.startAfter), lists(it.lists) { +} + +DirectoryIterator & DirectoryIterator::operator =(const DirectoryIterator & it) { + if (this == &it) { + return *this; + } + + needLocations = it.needLocations; + filesystem = it.filesystem; + next = it.next; + path = it.path; + startAfter = it.startAfter; + lists = it.lists; + return *this; +} + +bool DirectoryIterator::getListing() { + bool more; + + if (NULL == filesystem) { + return false; + } + + next = 0; + lists.clear(); + more = filesystem->getListing(path, startAfter, needLocations, lists); + + if (!lists.empty()) { + startAfter = lists.back().getPath(); + } + + return more || !lists.empty(); +} + +bool DirectoryIterator::hasNext() { + if (next >= lists.size()) { + return getListing(); + } + + return true; +} + +Hdfs::FileStatus DirectoryIterator::getNext() { + if (next >= lists.size()) { + if (!getListing()) { + THROW(HdfsIOException, "End of the dir flow"); + } + } + + return lists[next++]; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DirectoryIterator.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/DirectoryIterator.h b/depends/libhdfs3/src/client/DirectoryIterator.h new file mode 100644 index 0000000..cb12ad8 --- /dev/null +++ b/depends/libhdfs3/src/client/DirectoryIterator.h @@ -0,0 +1,63 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ +#define _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ + +#include "FileStatus.h" +#include <vector> + +namespace Hdfs { +namespace Internal { +class FileSystemImpl; +} + +class DirectoryIterator { +public: + DirectoryIterator(); + DirectoryIterator(Hdfs::Internal::FileSystemImpl * const fs, + std::string path, bool needLocations); + DirectoryIterator(const DirectoryIterator & it); + DirectoryIterator & operator = (const DirectoryIterator & it); + bool hasNext(); + FileStatus getNext(); + +private: + bool getListing(); + +private: + bool needLocations; + Hdfs::Internal::FileSystemImpl * filesystem; + size_t next; + std::string path; + std::string startAfter; + std::vector<FileStatus> lists; +}; + +} + +#endif /* _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileStatus.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/FileStatus.h b/depends/libhdfs3/src/client/FileStatus.h new file mode 100644 index 0000000..53b855c --- /dev/null +++ b/depends/libhdfs3/src/client/FileStatus.h @@ -0,0 +1,167 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_FILESTATUS_H_ +#define _HDFS_LIBHDFS3_CLIENT_FILESTATUS_H_ + +#include "Permission.h" + +#include <string> + +namespace Hdfs { + +class FileStatus { +public: + FileStatus() : + isdir(false), atime(0), blocksize(0), length(0), mtime( + 0), permission(0644), replications(0) { + } + + int64_t getAccessTime() const { + return atime; + } + + void setAccessTime(int64_t accessTime) { + atime = accessTime; + } + + short getReplication() const { + return replications; + } + + void setReplication(short blockReplication) { + replications = blockReplication; + } + + int64_t getBlockSize() const { + return blocksize; + } + + void setBlocksize(int64_t blocksize) { + this->blocksize = blocksize; + } + + const char * getGroup() const { + return group.c_str(); + } + + void setGroup(const char * group) { + this->group = group; + } + + /** + * Is this a directory? + * @return true if this is a directory + */ + bool isDirectory() const { + return isdir; + } + + void setIsdir(bool isdir) { + this->isdir = isdir; + } + + int64_t getLength() const { + return length; + } + + void setLength(int64_t length) { + this->length = length; + } + + int64_t getModificationTime() const { + return mtime; + } + + void setModificationTime(int64_t modificationTime) { + mtime = modificationTime; + } + + const char * getOwner() const { + return owner.c_str(); + } + + void setOwner(const char * owner) { + this->owner = owner; + } + + const char * getPath() const { + return path.c_str(); + } + + void setPath(const char * path) { + this->path = path; + } + + const Permission & getPermission() const { + return permission; + } + + void setPermission(const Permission & permission) { + this->permission = permission; + } + + const char * getSymlink() const { + return symlink.c_str(); + } + + void setSymlink(const char * symlink) { + this->symlink = symlink; + } + + /** + * Is this a file? + * @return true if this is a file + */ + bool isFile() { + return !isdir && !isSymlink(); + } + + /** + * Is this a symbolic link? + * @return true if this is a symbolic link + */ + bool isSymlink() { + return !symlink.empty(); + } + +private: + bool isdir; + int64_t atime; + int64_t blocksize; + int64_t length; + int64_t mtime; + Permission permission; + short replications; + std::string group; + std::string owner; + std::string path; + std::string symlink; +}; + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystem.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/FileSystem.cpp b/depends/libhdfs3/src/client/FileSystem.cpp new file mode 100644 index 0000000..0d0ef77 --- /dev/null +++ b/depends/libhdfs3/src/client/FileSystem.cpp @@ -0,0 +1,591 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DirectoryIterator.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystem.h" +#include "FileSystemImpl.h" +#include "FileSystemKey.h" +#include "Hash.h" +#include "SessionConfig.h" +#include "Thread.h" +#include "Token.h" +#include "Unordered.h" +#include "WritableUtils.h" + +#include <algorithm> +#include <string> +#include <krb5/krb5.h> + +using namespace Hdfs::Internal; + +namespace Hdfs { + +namespace Internal { + +static std::string ExtractPrincipalFromTicketCache( + const std::string & cachePath) { + krb5_context cxt = NULL; + krb5_ccache ccache = NULL; + krb5_principal principal = NULL; + krb5_error_code ec = 0; + std::string errmsg, retval; + char * priName = NULL; + + if (!cachePath.empty()) { + if (0 != setenv("KRB5CCNAME", cachePath.c_str(), 1)) { + THROW(HdfsIOException, "Cannot set env parameter \"KRB5CCNAME\""); + } + } + + do { + if (0 != (ec = krb5_init_context(&cxt))) { + break; + } + + if (0 != (ec = krb5_cc_default(cxt, &ccache))) { + break; + } + + if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) { + break; + } + + if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) { + break; + } + } while (0); + + if (!ec) { + retval = priName; + } else { + if (cxt) { + errmsg = krb5_get_error_message(cxt, ec); + } else { + errmsg = "Cannot initialize kerberos context"; + } + } + + if (priName != NULL) { + krb5_free_unparsed_name(cxt, priName); + } + + if (principal != NULL) { + krb5_free_principal(cxt, principal); + } + + if (ccache != NULL) { + krb5_cc_close(cxt, ccache); + } + + if (cxt != NULL) { + krb5_free_context(cxt); + } + + if (!errmsg.empty()) { + THROW(HdfsIOException, + "FileSystem: Failed to extract principal from ticket cache: %s", + errmsg.c_str()); + } + + return retval; +} + + +static std::string ExtractPrincipalFromToken(const Token & token) { + std::string realUser, owner; + std::string identifier = token.getIdentifier(); + WritableUtils cin(&identifier[0], identifier.size()); + char version; + + try { + version = cin.readByte(); + + if (version != 0) { + THROW(HdfsIOException, "Unknown version of delegation token"); + } + + owner = cin.ReadText(); + cin.ReadText(); + realUser = cin.ReadText(); + return realUser.empty() ? owner : realUser; + } catch (const std::range_error & e) { + } + + THROW(HdfsIOException, "Cannot extract principal from token"); +} +} + +FileSystem::FileSystem(const Config & conf) : + conf(conf), impl(NULL) { +} + +FileSystem::FileSystem(const FileSystem & other) : + conf(other.conf), impl(NULL) { + if (other.impl) { + impl = new FileSystemWrapper(other.impl->filesystem); + } +} + +FileSystem & FileSystem::operator =(const FileSystem & other) { + if (this == &other) { + return *this; + } + + conf = other.conf; + + if (impl) { + delete impl; + impl = NULL; + } + + if (other.impl) { + impl = new FileSystemWrapper(other.impl->filesystem); + } + + return *this; +} + +FileSystem::~FileSystem() { + if (impl) { + try { + disconnect(); + } catch (...) { + } + } +} + +void FileSystem::connect() { + Internal::SessionConfig sconf(conf); + connect(sconf.getDefaultUri().c_str(), NULL, NULL); +} + +/** + * Connect to hdfs + * @param uri hdfs connection uri, hdfs://host:port + */ +void FileSystem::connect(const char * uri) { + connect(uri, NULL, NULL); +} + +static FileSystemWrapper * ConnectInternal(const char * uri, + const std::string & principal, const Token * token, Config & conf) { + if (NULL == uri || 0 == strlen(uri)) { + THROW(InvalidParameter, "Invalid HDFS uri."); + } + + FileSystemKey key(uri, principal.c_str()); + + if (token) { + key.addToken(*token); + } + + return new FileSystemWrapper(shared_ptr<FileSystemInter>(new FileSystemImpl(key, conf))); +} + +/** + * Connect to hdfs with user or token + * username and token cannot be set at the same time + * @param uri connection uri. + * @param username user used to connect to hdfs + * @param token token used to connect to hdfs + */ +void FileSystem::connect(const char * uri, const char * username, const char * token) { + AuthMethod auth; + std::string principal; + + if (impl) { + THROW(HdfsIOException, "FileSystem: already connected."); + } + + try { + SessionConfig sconf(conf); + auth = RpcAuth::ParseMethod(sconf.getRpcAuthMethod()); + + if (token && auth != AuthMethod::SIMPLE) { + Token t; + t.fromString(token); + principal = ExtractPrincipalFromToken(t); + impl = ConnectInternal(uri, principal, &t, conf); + impl->filesystem->connect(); + return; + } else if (username) { + principal = username; + } + + if (auth == AuthMethod::KERBEROS) { + principal = ExtractPrincipalFromTicketCache(sconf.getKerberosCachePath()); + } + + impl = ConnectInternal(uri, principal, NULL, conf); + impl->filesystem->connect(); + } catch (...) { + delete impl; + impl = NULL; + throw; + } +} + +/** + * disconnect from hdfs + */ +void FileSystem::disconnect() { + delete impl; + impl = NULL; +} + +/** + * To get default number of replication. + * @return the default number of replication. + */ +int FileSystem::getDefaultReplication() const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getDefaultReplication(); +} + +/** + * To get the default number of block size. + * @return the default block size. + */ +int64_t FileSystem::getDefaultBlockSize() const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getDefaultBlockSize(); +} + +/** + * To get the home directory. + * @return home directory. + */ +std::string FileSystem::getHomeDirectory() const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getHomeDirectory(); +} + +/** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return return true if success. + */ +bool FileSystem::deletePath(const char * path, bool recursive) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->deletePath(path, recursive); +} + +/** + * To create a directory which given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ +bool FileSystem::mkdir(const char * path, const Permission & permission) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->mkdir(path, permission); +} + +/** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ +bool FileSystem::mkdirs(const char * path, const Permission & permission) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->mkdirs(path, permission); +} + +/** + * To get path information. + * @param path the path which information is to be returned. + * @return the path information. + */ +FileStatus FileSystem::getFileStatus(const char * path) const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getFileStatus(path); +} + +/** + * Return an array containing hostnames, offset and size of + * portions of the given file. + * + * This call is most helpful with DFS, where it returns + * hostnames of machines that contain the given file. + * + * The FileSystem will simply return an elt containing 'localhost'. + * + * @param path path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to + * @param start offset into the given file + * @param len length for which to get locations for + */ +std::vector<BlockLocation> FileSystem::getFileBlockLocations(const char * path, + int64_t start, int64_t len) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getFileBlockLocations(path, start, len); +} + +/** + * list the contents of a directory. + * @param path the directory path. + * @return Return a iterator to visit all elements in this directory. + */ +DirectoryIterator FileSystem::listDirectory(const char * path) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->listDirectory(path, false); +} + +/** + * list all the contents of a directory. + * @param path The directory path. + * @return Return a vector of file informations in the directory. + */ +std::vector<FileStatus> FileSystem::listAllDirectoryItems(const char * path) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->listAllDirectoryItems(path, false); +} + +/** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + */ +void FileSystem::setOwner(const char * path, const char * username, + const char * groupname) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->filesystem->setOwner(path, username, groupname); +} + +/** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be changed. + * @param mtime new modification time. + * @param atime new access time. + */ +void FileSystem::setTimes(const char * path, int64_t mtime, int64_t atime) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->filesystem->setTimes(path, mtime, atime); +} + +/** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + */ +void FileSystem::setPermission(const char * path, + const Permission & permission) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->filesystem->setPermission(path, permission); +} + +/** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + */ +bool FileSystem::setReplication(const char * path, short replication) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->setReplication(path, replication); +} + +/** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return return true if success. + */ +bool FileSystem::rename(const char * src, const char * dst) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->rename(src, dst); +} + +/** + * To set working directory. + * @param path new working directory. + */ +void FileSystem::setWorkingDirectory(const char * path) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->filesystem->setWorkingDirectory(path); +} + +/** + * To get working directory. + * @return working directory. + */ +std::string FileSystem::getWorkingDirectory() const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getWorkingDirectory(); +} + +/** + * To test if the path exist. + * @param path the path which is to be tested. + * @return return true if the path exist. + */ +bool FileSystem::exist(const char * path) const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->exist(path); +} + +/** + * To get the file system status. + * @return the file system status. + */ +FileSystemStats FileSystem::getStats() const { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getFsStats(); +} + +/** + * Truncate the file in the indicated path to the indicated size. + * @param src The path to the file to be truncated + * @param size The size the file is to be truncated to + * + * @return true if and client does not need to wait for block recovery, + * false if client needs to wait for block recovery. + */ +bool FileSystem::truncate(const char * src, int64_t size) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->truncate(src, size); +} + +std::string FileSystem::getDelegationToken(const char * renewer) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getDelegationToken(renewer); +} + +/** + * Get a valid Delegation Token using the default user as renewer. + * + * @return Token + * @throws IOException + */ +std::string FileSystem::getDelegationToken() { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->getDelegationToken(); +} + +/** + * Renew an existing delegation token. + * + * @param token delegation token obtained earlier + * @return the new expiration time + * @throws IOException + */ +int64_t FileSystem::renewDelegationToken(const std::string & token) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + return impl->filesystem->renewDelegationToken(token); +} + +/** + * Cancel an existing delegation token. + * + * @param token delegation token + * @throws IOException + */ +void FileSystem::cancelDelegationToken(const std::string & token) { + if (!impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->filesystem->cancelDelegationToken(token); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystem.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/FileSystem.h b/depends/libhdfs3/src/client/FileSystem.h new file mode 100644 index 0000000..c74a9fd --- /dev/null +++ b/depends/libhdfs3/src/client/FileSystem.h @@ -0,0 +1,294 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ +#define _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ + +#include "BlockLocation.h" +#include "DirectoryIterator.h" +#include "FileStatus.h" +#include "FileSystemStats.h" +#include "Permission.h" +#include "XmlConfig.h" + +#include <vector> + +namespace Hdfs { +namespace Internal { +struct FileSystemWrapper; +} + +class FileSystem { +public: + + /** + * Construct a FileSystem + * @param conf hdfs configuration + */ + FileSystem(const Config & conf); + + /** + * Copy construct of FileSystem + */ + FileSystem(const FileSystem & other); + + /** + * Assign operator of FileSystem + */ + FileSystem & operator = (const FileSystem & other); + + /** + * Destroy a HdfsFileSystem instance + */ + ~FileSystem(); + + /** + * Connect to default hdfs. + */ + void connect(); + + /** + * Connect to hdfs + * @param uri hdfs connection uri, hdfs://host:port + */ + void connect(const char * uri); + + /** + * Connect to hdfs with user or token + * username and token cannot be set at the same time + * @param uri connection uri. + * @param username user used to connect to hdfs + * @param token token used to connect to hdfs + */ + void connect(const char * uri, const char * username, const char * token); + + /** + * disconnect from hdfs + */ + void disconnect(); + + /** + * To get default number of replication. + * @return the default number of replication. + */ + int getDefaultReplication() const; + + /** + * To get the default number of block size. + * @return the default block size. + */ + int64_t getDefaultBlockSize() const; + + /** + * To get the home directory. + * @return home directory. + */ + std::string getHomeDirectory() const; + + /** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return return true if success. + */ + bool deletePath(const char * path, bool recursive); + + /** + * To create a directory which given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + bool mkdir(const char * path, const Permission & permission); + + /** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + bool mkdirs(const char * path, const Permission & permission); + + /** + * To get path information. + * @param path the path which information is to be returned. + * @return the path information. + */ + FileStatus getFileStatus(const char * path) const; + + /** + * Return an array containing hostnames, offset and size of + * portions of the given file. + * + * This call is most helpful with DFS, where it returns + * hostnames of machines that contain the given file. + * + * The FileSystem will simply return an elt containing 'localhost'. + * + * @param path path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to + * @param start offset into the given file + * @param len length for which to get locations for + */ + std::vector<BlockLocation> getFileBlockLocations(const char * path, + int64_t start, int64_t len); + + /** + * list the contents of a directory. + * @param path The directory path. + * @return Return a iterator to visit all elements in this directory. + */ + DirectoryIterator listDirectory(const char * path); + + /** + * list all the contents of a directory. + * @param path The directory path. + * @return Return a vector of file informations in the directory. + */ + std::vector<FileStatus> listAllDirectoryItems(const char * path); + + /** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + */ + void setOwner(const char * path, const char * username, + const char * groupname); + + /** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be changed. + * @param mtime new modification time. + * @param atime new access time. + */ + void setTimes(const char * path, int64_t mtime, int64_t atime); + + /** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + */ + void setPermission(const char * path, const Permission & permission); + + /** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + */ + bool setReplication(const char * path, short replication); + + /** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return return true if success. + */ + bool rename(const char * src, const char * dst); + + /** + * To set working directory. + * @param path new working directory. + */ + void setWorkingDirectory(const char * path); + + /** + * To get working directory. + * @return working directory. + */ + std::string getWorkingDirectory() const; + + /** + * To test if the path exist. + * @param path the path which is to be tested. + * @return return true if the path exist. + */ + bool exist(const char * path) const; + + /** + * To get the file system status. + * @return the file system status. + */ + FileSystemStats getStats() const; + + /** + * Truncate the file in the indicated path to the indicated size. + * @param src The path to the file to be truncated + * @param size The size the file is to be truncated to + * + * @return true if and client does not need to wait for block recovery, + * false if client needs to wait for block recovery. + */ + bool truncate(const char * src, int64_t size); + + /** + * Get a valid Delegation Token. + * + * @param renewer the designated renewer for the token + * @return Token string + * @throws IOException + */ + std::string getDelegationToken(const char * renewer); + + /** + * Get a valid Delegation Token using the default user as renewer. + * + * @return Token string + * @throws IOException + */ + std::string getDelegationToken(); + + /** + * Renew an existing delegation token. + * + * @param token delegation token obtained earlier + * @return the new expiration time + * @throws IOException + */ + int64_t renewDelegationToken(const std::string & token); + + /** + * Cancel an existing delegation token. + * + * @param token delegation token + * @throws IOException + */ + void cancelDelegationToken(const std::string & token); + +private: + Config conf; + Internal::FileSystemWrapper * impl; + + friend class InputStream; + friend class OutputStream; +}; + +} +#endif /* _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystemImpl.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/FileSystemImpl.cpp b/depends/libhdfs3/src/client/FileSystemImpl.cpp new file mode 100644 index 0000000..c9daeb3 --- /dev/null +++ b/depends/libhdfs3/src/client/FileSystemImpl.cpp @@ -0,0 +1,785 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Atomic.h" +#include "BlockLocation.h" +#include "DirectoryIterator.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileStatus.h" +#include "FileSystemImpl.h" +#include "FileSystemStats.h" +#include "InputStream.h" +#include "LeaseRenewer.h" +#include "Logger.h" +#include "OutputStream.h" +#include "OutputStreamImpl.h" +#include "server/LocatedBlocks.h" +#include "server/NamenodeInfo.h" +#include "server/NamenodeProxy.h" +#include "StringUtil.h" + +#include <cstring> +#include <inttypes.h> +#include <libxml/uri.h> +#include <strings.h> + +namespace Hdfs { +namespace Internal { + +static const std::string GetAbsPath(const std::string & prefix, + const std::string & path) { + if (path.empty()) { + return prefix; + } + + if ('/' == path[0]) { + return path; + } else { + return prefix + "/" + path; + } +} + +/* + * Return the canonical absolute name of file NAME. + * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/') + */ +static const std::string CanonicalizePath(const std::string & path) { + int skip = 0; + std::string retval; + std::vector<std::string> components = StringSplit(path, "/"); + std::deque<std::string> tmp; + std::vector<std::string>::reverse_iterator s = components.rbegin(); + + while (s != components.rend()) { + if (s->empty() || *s == ".") { + ++s; + } else if (*s == "..") { + ++skip; + ++s; + } else { + if (skip <= 0) { + tmp.push_front(*s); + } else { + --skip; + } + + ++s; + } + } + + for (size_t i = 0; i < tmp.size(); ++i) { + retval += "/"; + retval += tmp[i]; + } + + return retval.empty() ? "/" : retval; +} + +FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c) + : conf(c), + key(key), + openedOutputStream(0), + nn(NULL), + sconf(c), + user(key.getUser()) { + static atomic<uint32_t> count(0); + std::stringstream ss; + ss.imbue(std::locale::classic()); + srand((unsigned int) time(NULL)); + ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_" + << getpid() << "_tid_" << pthread_self(); + clientName = ss.str(); + workingDir = std::string("/user/") + user.getEffectiveUser(); + peerCache = shared_ptr<PeerCache>(new PeerCache(sconf)); +#ifdef MOCK + stub = NULL; +#endif + //set log level + RootLogger.setLogSeverity(sconf.getLogSeverity()); +} + +/** + * Destroy a FileSystemBase instance + */ +FileSystemImpl::~FileSystemImpl() { + try { + disconnect(); + } catch (...) { + } +} + +const std::string FileSystemImpl::getStandardPath(const char * path) { + std::string base; + { + lock_guard<mutex> lock(mutWorkingDir); + base = workingDir; + } + return CanonicalizePath(GetAbsPath(base, path)); +} + +const char * FileSystemImpl::getClientName() { + return clientName.c_str(); +} + +void FileSystemImpl::connect() { + std::string host, port, uri; + std::vector<NamenodeInfo> namenodeInfos; + + if (nn) { + THROW(HdfsIOException, "FileSystemImpl: already connected."); + } + + host = key.getHost(); + port = key.getPort(); + uri += key.getScheme() + "://" + host; + + if (port.empty()) { + try { + namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf); + } catch (const HdfsConfigNotFound & e) { + NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration", uri.c_str()); + } + + tokenService = "ha-hdfs:"; + tokenService += host; + } else { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << host << ":" << port; + namenodeInfos.resize(1); + namenodeInfos[0].setRpcAddr(ss.str()); + tokenService = namenodeInfos[0].getRpcAddr(); + } + +#ifdef MOCK + nn = stub->getNamenode(); +#else + nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod()))); +#endif + /* + * To test if the connection is ok + */ + getFsStats(); +} + +/** + * disconnect from hdfs + */ +void FileSystemImpl::disconnect() { + if (nn) { + nn->close(); + delete nn; + } + + nn = NULL; +} + +/** + * To get default number of replication. + * @return the default number of replication. + */ +int FileSystemImpl::getDefaultReplication() const { + return sconf.getDefaultReplica(); +} + +/** + * To get the default number of block size. + * @return the default block size. + */ +int64_t FileSystemImpl::getDefaultBlockSize() const { + return sconf.getDefaultBlockSize(); +} + +/** + * To get the home directory. + * @return home directory. + */ +std::string FileSystemImpl::getHomeDirectory() const { + return std::string("/user/") + user.getEffectiveUser(); +} + +/** + * To delete a file or directory. + * @param path the path to be deleted. + * @param recursive if path is a directory, delete the contents recursively. + * @return return true if success. + */ + +bool FileSystemImpl::deletePath(const char * path, bool recursive) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->deleteFile(getStandardPath(path), recursive); +} + +/** + * To create a directory which given permission. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + +bool FileSystemImpl::mkdir(const char * path, const Permission & permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->mkdirs(getStandardPath(path), permission, false); +} + +/** + * To create a directory which given permission. + * If parent path does not exits, create it. + * @param path the directory path which is to be created. + * @param permission directory permission. + * @return return true if success. + */ + +bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->mkdirs(getStandardPath(path), permission, true); +} + +/** + * To get path information. + * @param path the path which information is to be returned. + * @return the path information. + */ +FileStatus FileSystemImpl::getFileStatus(const char * path) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->getFileInfo(getStandardPath(path), NULL); +} + +static void Convert(BlockLocation & bl, const LocatedBlock & lb) { + const std::vector<DatanodeInfo> & nodes = lb.getLocations(); + bl.setCorrupt(lb.isCorrupt()); + bl.setLength(lb.getNumBytes()); + bl.setOffset(lb.getOffset()); + std::vector<std::string> hosts(nodes.size()); + std::vector<std::string> names(nodes.size()); + std::vector<std::string> topologyPaths(nodes.size()); + + for (size_t i = 0 ; i < nodes.size() ; ++i) { + hosts[i] = nodes[i].getHostName(); + names[i] = nodes[i].getXferAddr(); + topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr(); + } + + bl.setNames(names); + bl.setHosts(hosts); + bl.setTopologyPaths(topologyPaths); +} + +std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations( + const char * path, int64_t start, int64_t len) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if (start < 0) { + THROW(InvalidParameter, "Invalid input: start offset should be positive"); + } + + if (len < 0) { + THROW(InvalidParameter, "Invalid input: length should be positive"); + } + + LocatedBlocksImpl lbs; + nn->getBlockLocations(getStandardPath(path), start, len, lbs); + std::vector<LocatedBlock> blocks = lbs.getBlocks(); + std::vector<BlockLocation> retval(blocks.size()); + + for (size_t i = 0; i < blocks.size(); ++i) { + Convert(retval[i], blocks[i]); + } + + return retval; +} + +/** + * list the contents of a directory. + * @param path the directory path. + * @return return the path informations in the given directory. + */ +DirectoryIterator FileSystemImpl::listDirectory(const char * path, + bool needLocation) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return DirectoryIterator(this, getStandardPath(path), needLocation); +} + +/** + * list all the contents of a directory. + * @param path The directory path. + * @return Return a vector of file informations in the directory. + */ +std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path, + bool needLocation) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + std::string startAfter; + std::string p = getStandardPath(path); + std::vector<FileStatus> retval; + + while (getListing(p, startAfter, needLocation, retval)) { + startAfter = retval.back().getPath(); + } + + return retval; +} + +/** + * To set the owner and the group of the path. + * username and groupname cannot be empty at the same time. + * @param path the path which owner of group is to be changed. + * @param username new user name. + * @param groupname new group. + */ +void FileSystemImpl::setOwner(const char * path, const char * username, + const char * groupname) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if ((NULL == username || !strlen(username)) + && (NULL == groupname || !strlen(groupname))) { + THROW(InvalidParameter, + "Invalid input: username and groupname should not be empty"); + } + + nn->setOwner(getStandardPath(path), username != NULL ? username : "", + groupname != NULL ? groupname : ""); +} + +/** + * To set the access time or modification time of a path. + * @param path the path which access time or modification time is to be changed. + * @param mtime new modification time. + * @param atime new access time. + */ +void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + nn->setTimes(getStandardPath(path), mtime, atime); +} + +/** + * To set the permission of a path. + * @param path the path which permission is to be changed. + * @param permission new permission. + */ +void FileSystemImpl::setPermission(const char * path, + const Permission & permission) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + nn->setPermission(getStandardPath(path), permission); +} + +/** + * To set the number of replication. + * @param path the path which number of replication is to be changed. + * @param replication new number of replication. + * @return return true if success. + */ + +bool FileSystemImpl::setReplication(const char * path, short replication) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + return nn->setReplication(getStandardPath(path), replication); +} + +/** + * To rename a path. + * @param src old path. + * @param dst new path. + * @return return true if success. + */ + +bool FileSystemImpl::rename(const char * src, const char * dst) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == src || !strlen(src)) { + THROW(InvalidParameter, "Invalid input: src should not be empty"); + } + + if (NULL == dst || !strlen(dst)) { + THROW(InvalidParameter, "Invalid input: dst should not be empty"); + } + + return nn->rename(getStandardPath(src), getStandardPath(dst)); +} + +/** + * To set working directory. + * @param path new working directory. + */ +void FileSystemImpl::setWorkingDirectory(const char * path) { + if (NULL == path) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + if (!strlen(path) || '/' != path[0]) { + THROW(InvalidParameter, + "Invalid input: path should be an absolute path"); + } + + lock_guard<mutex> lock(mutWorkingDir); + workingDir = path; +} + +/** + * To get working directory. + * @return working directory. + */ +std::string FileSystemImpl::getWorkingDirectory() const { + return workingDir; +} + +/** + * To test if the path exist. + * @param path the path which is to be tested. + * @return return true if the path exist. + */ + +bool FileSystemImpl::exist(const char * path) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: path should not be empty"); + } + + try { + bool retval = true; + nn->getFileInfo(getStandardPath(path), &retval); + return retval; + } catch (const FileNotFoundException & e) { + return false; + } + + return true; +} + +/** + * To get the file system status. + * @return the file system status. + */ +FileSystemStats FileSystemImpl::getFsStats() { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + std::vector<int64_t> retval = nn->getFsStats(); + assert(retval.size() >= 3); + return FileSystemStats(retval[0], retval[1], retval[2]); +} + +/** + * Truncate the file in the indicated path to the indicated size. + * @param path The path to the file to be truncated + * @param size The size the file is to be truncated to + * + * @return true if and client does not need to wait for block recovery, + * false if client needs to wait for block recovery. + */ +bool FileSystemImpl::truncate(const char * path, int64_t size) { + LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size); + + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == path || !strlen(path)) { + THROW(InvalidParameter, "Invalid input: src should not be empty."); + } + + std::string absPath = getStandardPath(path); + + return nn->truncate(absPath, size, clientName); +} + +std::string FileSystemImpl::getDelegationToken(const char * renewer) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + if (NULL == renewer || !strlen(renewer)) { + THROW(InvalidParameter, "Invalid input: renewer should not be empty."); + } + + Token retval = nn->getDelegationToken(renewer); + retval.setService(tokenService); + return retval.toString(); +} + +std::string FileSystemImpl::getDelegationToken() { + return getDelegationToken(key.getUser().getPrincipal().c_str()); +} + +int64_t FileSystemImpl::renewDelegationToken(const std::string & token) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + Token t; + t.fromString(token); + return nn->renewDelegationToken(t); +} + +void FileSystemImpl::cancelDelegationToken(const std::string & token) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + Token t; + t.fromString(token); + nn->cancelDelegationToken(t); +} + +void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset, + int64_t length, LocatedBlocks & lbs) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->getBlockLocations(src, offset, length, lbs); +} + +void FileSystemImpl::create(const std::string & src, const Permission & masked, + int flag, bool createParent, short replication, int64_t blockSize) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->create(src, masked, clientName, flag, createParent, replication, + blockSize); +} + +std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > +FileSystemImpl::append(const std::string& src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->append(src, clientName); +} + +void FileSystemImpl::abandonBlock(const ExtendedBlock & b, + const std::string & src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->abandonBlock(b, src, clientName); +} + +shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src, + const ExtendedBlock * previous, + const std::vector<DatanodeInfo> & excludeNodes) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->addBlock(src, clientName, previous, excludeNodes); +} + +shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode( + const std::string & src, const ExtendedBlock & blk, + const std::vector<DatanodeInfo> & existings, + const std::vector<std::string> & storageIDs, + const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes, + numAdditionalNodes, clientName); +} + +bool FileSystemImpl::complete(const std::string & src, + const ExtendedBlock * last) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->complete(src, clientName, last); +} + +/*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->reportBadBlocks(blocks); +}*/ + +void FileSystemImpl::fsync(const std::string & src) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->fsync(src, clientName); +} + +shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline( + const ExtendedBlock & block) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->updateBlockForPipeline(block, clientName); +} + +void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock, + const ExtendedBlock & newBlock, + const std::vector<DatanodeInfo> & newNodes, + const std::vector<std::string> & storageIDs) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); +} + +bool FileSystemImpl::getListing(const std::string & src, + const std::string & startAfter, bool needLocation, + std::vector<FileStatus> & dl) { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + return nn->getListing(src, startAfter, needLocation, dl); +} + +bool FileSystemImpl::renewLease() { + if (!nn) { + THROW(HdfsIOException, "FileSystemImpl: not connected."); + } + + //protected by LeaseRenewer's lock + if (0 == openedOutputStream) { + return false; + } + + try { + nn->renewLease(clientName); + return true; + } catch (const HdfsException & e) { + std::string buffer; + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name is %s, since:\n%s", + getClientName(), GetExceptionDetail(e, buffer)); + } catch (const std::exception & e) { + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name is %s, since:\n%s", + getClientName(), e.what()); + } + + return false; +} + +void FileSystemImpl::registerOpenedOutputStream() { + //protected by LeaseRenewer's lock + ++openedOutputStream; +} + +bool FileSystemImpl::unregisterOpenedOutputStream() { + //protected by LeaseRenewer's lock + if (openedOutputStream > 0) { + --openedOutputStream; + } + + return openedOutputStream == 0; +} + +} +}