http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamImpl.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.cpp b/depends/libhdfs3/src/client/OutputStreamImpl.cpp new file mode 100644 index 0000000..0c9f813 --- /dev/null +++ b/depends/libhdfs3/src/client/OutputStreamImpl.cpp @@ -0,0 +1,642 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Atomic.h" +#include "DateTime.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemInter.h" +#include "HWCrc32c.h" +#include "LeaseRenewer.h" +#include "Logger.h" +#include "OutputStream.h" +#include "OutputStreamImpl.h" +#include "Packet.h" +#include "PacketHeader.h" +#include "SWCrc32c.h" + +#include <cassert> +#include <inttypes.h> + +namespace Hdfs { +namespace Internal { + +OutputStreamImpl::OutputStreamImpl() : +/*heartBeatStop(true),*/ closed(true), isAppend(false), syncBlock(false), checksumSize(0), chunkSize( + 0), chunksPerPacket(0), closeTimeout(0), heartBeatInterval(0), packetSize(0), position( + 0), replication(0), blockSize(0), bytesWritten(0), cursor(0), lastFlushed( + 0), nextSeqNo(0), packets(0) { + if (HWCrc32c::available()) { + checksum = shared_ptr < Checksum > (new HWCrc32c()); + } else { + checksum = shared_ptr < Checksum > (new SWCrc32c()); + } + + checksumSize = sizeof(int32_t); + lastSend = steady_clock::now(); +#ifdef MOCK + stub = NULL; +#endif +} + +OutputStreamImpl::~OutputStreamImpl() { + if (!closed) { + try { + close(); + } catch (...) { + } + } +} + +void OutputStreamImpl::checkStatus() { + if (closed) { + THROW(HdfsIOException, "OutputStreamImpl: stream is not opened."); + } + + lock_guard < mutex > lock(mut); + + if (lastError != exception_ptr()) { + rethrow_exception(lastError); + } +} + +void OutputStreamImpl::setError(const exception_ptr & error) { + try { + lock_guard < mutex > lock(mut); + lastError = error; + } catch (...) { + } +} + +/** + * To create or append a file. + * @param fs hdfs file system. + * @param path the file path. + * @param flag creation flag, can be Create, Append or Create|Overwrite. + * @param permission create a new file with given permission. + * @param createParent if the parent does not exist, create it. + * @param replication create a file with given number of replication. + * @param blockSize create a file with given block size. + */ +void OutputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path, int flag, + const Permission & permission, bool createParent, int replication, + int64_t blockSize) { + if (NULL == path || 0 == strlen(path) || replication < 0 || blockSize < 0) { + THROW(InvalidParameter, "Invalid parameter."); + } + + if (!(flag == Create || flag == (Create | SyncBlock) || flag == Overwrite + || flag == (Overwrite | SyncBlock) || flag == Append + || flag == (Append | SyncBlock) || flag == (Create | Overwrite) + || flag == (Create | Overwrite | SyncBlock) + || flag == (Create | Append) + || flag == (Create | Append | SyncBlock))) { + THROW(InvalidParameter, "Invalid flag."); + } + + try { + openInternal(fs, path, flag, permission, createParent, replication, + blockSize); + } catch (...) { + reset(); + throw; + } +} + +void OutputStreamImpl::computePacketChunkSize() { + int chunkSizeWithChecksum = chunkSize + checksumSize; + static const int packetHeaderSize = PacketHeader::GetPkgHeaderSize(); + chunksPerPacket = + (packetSize - packetHeaderSize + chunkSizeWithChecksum - 1) + / chunkSizeWithChecksum; + chunksPerPacket = chunksPerPacket > 1 ? chunksPerPacket : 1; + packetSize = chunksPerPacket * chunkSizeWithChecksum + packetHeaderSize; + buffer.resize(chunkSize); +} + +void OutputStreamImpl::initAppend() { + FileStatus fileInfo; + std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus; + lastBlockWithStatus = filesystem->append(this->path); + lastBlock = lastBlockWithStatus.first; + + if (lastBlockWithStatus.second) { + fileInfo = *lastBlockWithStatus.second; + } else { + fileInfo = filesystem->getFileStatus(this->path.c_str()); + } + + closed = false; + + try { + this->blockSize = fileInfo.getBlockSize(); + cursor = fileInfo.getLength(); + + if (lastBlock) { + isAppend = true; + bytesWritten = lastBlock->getNumBytes(); + int64_t usedInLastBlock = fileInfo.getLength() % blockSize; + int64_t freeInLastBlock = blockSize - usedInLastBlock; + + if (freeInLastBlock == this->blockSize) { + THROW(HdfsIOException, + "OutputStreamImpl: the last block for file %s is full.", + this->path.c_str()); + } + + int usedInCksum = cursor % chunkSize; + int freeInCksum = chunkSize - usedInCksum; + + if (usedInCksum > 0 && freeInCksum > 0) { + /* + * if there is space in the last partial chunk, then + * setup in such a way that the next packet will have only + * one chunk that fills up the partial chunk. + */ + packetSize = 0; + chunkSize = freeInCksum; + } else { + /* + * if the remaining space in the block is smaller than + * that expected size of of a packet, then create + * smaller size packet. + */ + packetSize = + packetSize < freeInLastBlock ? + packetSize : static_cast<int>(freeInLastBlock); + } + } + } catch (...) { + completeFile(false); + reset(); + throw; + } + + computePacketChunkSize(); +} + +void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path, + int flag, const Permission & permission, bool createParent, + int replication, int64_t blockSize) { + filesystem = fs; + this->path = fs->getStandardPath(path); + this->replication = replication; + this->blockSize = blockSize; + syncBlock = flag & SyncBlock; + conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf())); + LOG(DEBUG2, "open file %s for %s", this->path.c_str(), (flag & Append ? "append" : "write")); + packets.setMaxSize(conf->getPacketPoolSize()); + + if (0 == replication) { + this->replication = conf->getDefaultReplica(); + } else { + this->replication = replication; + } + + if (0 == blockSize) { + this->blockSize = conf->getDefaultBlockSize(); + } else { + this->blockSize = blockSize; + } + + chunkSize = conf->getDefaultChunkSize(); + packetSize = conf->getDefaultPacketSize(); + heartBeatInterval = conf->getHeartBeatInterval(); + closeTimeout = conf->getCloseFileTimeout(); + + if (packetSize < chunkSize) { + THROW(InvalidParameter, + "OutputStreamImpl: packet size %d is less than the chunk size %d.", + packetSize, chunkSize); + } + + if (0 != this->blockSize % chunkSize) { + THROW(InvalidParameter, + "OutputStreamImpl: block size %" PRId64 " is not the multiply of chunk size %d.", + this->blockSize, chunkSize); + } + + try { + if (flag & Append) { + initAppend(); + LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem); + return; + } + } catch (const FileNotFoundException & e) { + if (!(flag & Create)) { + throw; + } + } + + assert((flag & Create) || (flag & Overwrite)); + fs->create(this->path, permission, flag, createParent, this->replication, + this->blockSize); + closed = false; + computePacketChunkSize(); + LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem); +} + +/** + * To append data to file. + * @param buf the data used to append. + * @param size the data size. + */ +void OutputStreamImpl::append(const char * buf, int64_t size) { + LOG(DEBUG3, "append file %s size is %" PRId64 ", offset %" PRId64 " next pos %" PRId64, path.c_str(), size, cursor, size + cursor); + + if (NULL == buf || size < 0) { + THROW(InvalidParameter, "Invalid parameter."); + } + + checkStatus(); + + try { + appendInternal(buf, size); + } catch (...) { + setError(current_exception()); + throw; + } +} + +void OutputStreamImpl::appendInternal(const char * buf, int64_t size) { + int64_t todo = size; + + while (todo > 0) { + int batch = buffer.size() - position; + batch = batch < todo ? batch : static_cast<int>(todo); + + /* + * bypass buffer. + */ + if (0 == position && todo >= static_cast<int64_t>(buffer.size())) { + checksum->update(buf + size - todo, batch); + appendChunkToPacket(buf + size - todo, batch); + bytesWritten += batch; + checksum->reset(); + } else { + checksum->update(buf + size - todo, batch); + memcpy(&buffer[position], buf + size - todo, batch); + position += batch; + + if (position == static_cast<int>(buffer.size())) { + appendChunkToPacket(&buffer[0], buffer.size()); + bytesWritten += buffer.size(); + checksum->reset(); + position = 0; + } + } + + todo -= batch; + + if (currentPacket + && (currentPacket->isFull() || bytesWritten == blockSize)) { + sendPacket(currentPacket); + + if (isAppend) { + isAppend = false; + chunkSize = conf->getDefaultChunkSize(); + packetSize = conf->getDefaultPacketSize(); + computePacketChunkSize(); + } + + if (bytesWritten == blockSize) { + closePipeline(); + } + } + } + + cursor += size; +} + +void OutputStreamImpl::appendChunkToPacket(const char * buf, int size) { + assert(NULL != buf && size > 0); + + if (!currentPacket) { + currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, + nextSeqNo++, checksumSize); + } + + currentPacket->addChecksum(checksum->getValue()); + currentPacket->addData(buf, size); + currentPacket->increaseNumChunks(); +} + +void OutputStreamImpl::sendPacket(shared_ptr<Packet> packet) { + if (!pipeline) { + setupPipeline(); + } + + pipeline->send(currentPacket); + currentPacket.reset(); + lastSend = steady_clock::now(); +} + +void OutputStreamImpl::setupPipeline() { + assert(currentPacket); +#ifdef MOCK + pipeline = stub->getPipeline(); +#else + pipeline = shared_ptr<Pipeline>(new PipelineImpl(isAppend, path.c_str(), *conf, filesystem, + CHECKSUM_TYPE_CRC32C, conf->getDefaultChunkSize(), replication, + currentPacket->getOffsetInBlock(), packets, lastBlock)); +#endif + lastSend = steady_clock::now(); + /* + * start heart beat beat thread + */ + /*if (heartBeatStop) { + if (heartBeatSender.joinable()) { + heartBeatSender.join(); + } + + heartBeatStop = false; + heartBeatSender = thread(&OutputStreamImpl::heartBeatSenderRoutine, this); + }*/ +} + +/** + * Flush all data in buffer and waiting for ack. + * Will block until get all acks. + */ +void OutputStreamImpl::flush() { + LOG(DEBUG3, "flush file %s at offset %" PRId64, path.c_str(), cursor); + checkStatus(); + + try { + flushInternal(false); + } catch (...) { + setError(current_exception()); + throw; + } +} + +void OutputStreamImpl::flushInternal(bool needSync) { + if (lastFlushed == cursor && !needSync) { + return; + } else { + lastFlushed = cursor; + } + + if (position > 0) { + appendChunkToPacket(&buffer[0], position); + } + + /* + * if the pipeline and currentPacket are both NULL, + * that means the pipeline has been closed and no more data in buffer/packet. + * already synced when closing pipeline. + */ + if (!currentPacket && needSync && pipeline) { + currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, + nextSeqNo++, checksumSize); + } + + lock_guard < mutex > lock(mut); + + if (currentPacket) { + currentPacket->setSyncFlag(needSync); + sendPacket(currentPacket); + } + + if (pipeline) { + pipeline->flush(); + } +} + +/** + * return the current file length. + * @return current file length. + */ +int64_t OutputStreamImpl::tell() { + checkStatus(); + return cursor; +} + +/** + * @ref OutputStream::sync + */ +void OutputStreamImpl::sync() { + LOG(DEBUG3, "sync file %s at offset %" PRId64, path.c_str(), cursor); + checkStatus(); + + try { + flushInternal(true); + } catch (...) { + setError(current_exception()); + throw; + } +} + +void OutputStreamImpl::completeFile(bool throwError) { + steady_clock::time_point start = steady_clock::now(); + + while (true) { + try { + bool success; + success = filesystem->complete(path, lastBlock.get()); + + if (success) { + return; + } + } catch (HdfsIOException & e) { + if (throwError) { + NESTED_THROW(HdfsIOException, + "OutputStreamImpl: failed to complete file %s.", + path.c_str()); + } else { + return; + } + } + + if (closeTimeout > 0) { + steady_clock::time_point end = steady_clock::now(); + + if (ToMilliSeconds(start, end) >= closeTimeout) { + if (throwError) { + THROW(HdfsIOException, + "OutputStreamImpl: timeout when complete file %s, timeout interval %d ms.", + path.c_str(), closeTimeout); + } else { + return; + } + } + } + + try { + sleep_for(milliseconds(400)); + } catch (...) { + } + } +} + +/** + * close the stream. + */ +void OutputStreamImpl::closePipeline() { + lock_guard < mutex > lock(mut); + + if (!pipeline) { + return; + } + + if (currentPacket) { + sendPacket(currentPacket); + } + + currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++, + checksumSize); + + if (syncBlock) { + currentPacket->setSyncFlag(syncBlock); + } + + lastBlock = pipeline->close(currentPacket); + assert(lastBlock); + currentPacket.reset(); + pipeline.reset(); + filesystem->fsync(path); + bytesWritten = 0; +} + +void OutputStreamImpl::close() { + exception_ptr e; + + if (closed) { + return; + } + + try { + //pipeline may be broken + if (!lastError) { + if (lastFlushed != cursor && position > 0) { + appendChunkToPacket(&buffer[0], position); + } + + if (lastFlushed != cursor && currentPacket) { + sendPacket(currentPacket); + } + + closePipeline(); + /*heartBeatStop = true; + condHeartBeatSender.notify_all(); + + if (heartBeatSender.joinable()) { + heartBeatSender.join(); + }*/ + completeFile(true); + } + } catch (...) { + e = current_exception(); + } + + LeaseRenewer::GetLeaseRenewer().StopRenew(filesystem); + LOG(DEBUG3, "close file %s for write with length %" PRId64, path.c_str(), cursor); + reset(); + + if (e) { + rethrow_exception(e); + } +} + +void OutputStreamImpl::reset() { + blockSize = 0; + bytesWritten = 0; + checksum->reset(); + chunkSize = 0; + chunksPerPacket = 0; + closed = true; + closeTimeout = 0; + conf.reset(); + currentPacket.reset(); + cursor = 0; + filesystem.reset(); + heartBeatInterval = 0; + isAppend = false; + lastBlock.reset(); + lastError = exception_ptr(); + lastFlushed = 0; + nextSeqNo = 0; + packetSize = 0; + path.clear(); + pipeline.reset(); + position = 0; + replication = 0; + syncBlock = false; +} + +std::string OutputStreamImpl::toString() { + if (path.empty()) { + return std::string("OutputStream for path ") + path; + } else { + return std::string("OutputStream (not opened)"); + } +} + +/*void OutputStreamImpl::heartBeatSenderRoutine() { + assert(heartBeatStop == false); + + while (!heartBeatStop) { + try { + unique_lock < mutex > lock(mut); + condHeartBeatSender.wait_for(lock, milliseconds(1000)); + + try { + try { + if (pipeline + && ToMilliSeconds(lastSend, steady_clock::now()) + >= heartBeatInterval) { + pipeline->send(shared_ptr < Packet > (new Packet())); + lastSend = steady_clock::now(); + } + } catch (...) { + NESTED_THROW(Hdfs::HdfsIOException, "Failed to send heart beat, path: %s", + path.c_str()); + } + } catch (...) { + lastError = current_exception(); + throw; + } + } catch (const std::bad_alloc & e) { + + * keep quiet if we run out of memory, since writing log need memory, + * that may cause the process terminated. + + break; + } catch (const Hdfs::HdfsException & e) { + LOG(LOG_ERROR, "Heart beat thread exit since %s", + GetExceptionDetail(e)); + } catch (const std::exception & e) { + LOG(LOG_ERROR, "Heart beat thread exit since %s", + e.what()); + } + } + + heartBeatStop = true; +}*/ + +} +}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamImpl.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.h b/depends/libhdfs3/src/client/OutputStreamImpl.h new file mode 100644 index 0000000..4967e0f --- /dev/null +++ b/depends/libhdfs3/src/client/OutputStreamImpl.h @@ -0,0 +1,173 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ +#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ + +#include "Atomic.h" +#include "Checksum.h" +#include "DateTime.h" +#include "ExceptionInternal.h" +#include "FileSystem.h" +#include "Memory.h" +#include "OutputStreamInter.h" +#include "PacketPool.h" +#include "Permission.h" +#include "Pipeline.h" +#include "server/LocatedBlock.h" +#include "SessionConfig.h" +#include "Thread.h" +#ifdef MOCK +#include "PipelineStub.h" +#endif + +namespace Hdfs { +namespace Internal { +/** + * A output stream used to write data to hdfs. + */ +class OutputStreamImpl: public OutputStreamInter { +public: + OutputStreamImpl(); + + ~OutputStreamImpl(); + + /** + * To create or append a file. + * @param fs hdfs file system. + * @param path the file path. + * @param flag creation flag, can be Create, Append or Create|Overwrite. + * @param permission create a new file with given permission. + * @param createParent if the parent does not exist, create it. + * @param replication create a file with given number of replication. + * @param blockSize create a file with given block size. + */ + void open(shared_ptr<FileSystemInter> fs, const char * path, int flag, + const Permission & permission, bool createParent, int replication, + int64_t blockSize); + + /** + * To append data to file. + * @param buf the data used to append. + * @param size the data size. + */ + void append(const char * buf, int64_t size); + + /** + * Flush all data in buffer and waiting for ack. + * Will block until get all acks. + */ + void flush(); + + /** + * return the current file length. + * @return current file length. + */ + int64_t tell(); + + /** + * @ref OutputStream::sync + */ + void sync(); + + /** + * close the stream. + */ + void close(); + + /** + * Output a readable string of this output stream. + */ + std::string toString(); + + /** + * Keep the last error of this stream. + * @error the error to be kept. + */ + void setError(const exception_ptr & error); + +private: + void appendChunkToPacket(const char * buf, int size); + void appendInternal(const char * buf, int64_t size); + void checkStatus(); + void closePipeline(); + void completeFile(bool throwError); + void computePacketChunkSize(); + void flushInternal(bool needSync); + //void heartBeatSenderRoutine(); + void initAppend(); + void openInternal(shared_ptr<FileSystemInter> fs, const char * path, int flag, + const Permission & permission, bool createParent, int replication, + int64_t blockSize); + void reset(); + void sendPacket(shared_ptr<Packet> packet); + void setupPipeline(); + +private: + //atomic<bool> heartBeatStop; + bool closed; + bool isAppend; + bool syncBlock; + //condition_variable condHeartBeatSender; + exception_ptr lastError; + int checksumSize; + int chunkSize; + int chunksPerPacket; + int closeTimeout; + int heartBeatInterval; + int packetSize; + int position; //cursor in buffer + int replication; + int64_t blockSize; //max size of block + int64_t bytesWritten; //the size of bytes has be written into packet (not include the data in chunk buffer). + int64_t cursor; //cursor in file. + int64_t lastFlushed; //the position last flushed + int64_t nextSeqNo; + mutex mut; + PacketPool packets; + shared_ptr<Checksum> checksum; + shared_ptr<FileSystemInter> filesystem; + shared_ptr<LocatedBlock> lastBlock; + shared_ptr<Packet> currentPacket; + shared_ptr<Pipeline> pipeline; + shared_ptr<SessionConfig> conf; + std::string path; + std::vector<char> buffer; + steady_clock::time_point lastSend; + //thread heartBeatSender; + + friend class Pipeline; +#ifdef MOCK +private: + Hdfs::Mock::PipelineStub * stub; +#endif +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamInter.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStreamInter.h b/depends/libhdfs3/src/client/OutputStreamInter.h new file mode 100644 index 0000000..9477a0d --- /dev/null +++ b/depends/libhdfs3/src/client/OutputStreamInter.h @@ -0,0 +1,98 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_ +#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_ + +#include "ExceptionInternal.h" +#include "FileSystemInter.h" +#include "Memory.h" +#include "Permission.h" + +namespace Hdfs { +namespace Internal { + +/** + * A output stream used to write data to hdfs. + */ +class OutputStreamInter { +public: + virtual ~OutputStreamInter() { + } + + /** + * To create or append a file. + * @param fs hdfs file system. + * @param path the file path. + * @param flag creation flag, can be Create, Append or Create|Overwrite. + * @param permission create a new file with given permission. + * @param createParent if the parent does not exist, create it. + * @param replication create a file with given number of replication. + * @param blockSize create a file with given block size. + */ + virtual void open(shared_ptr<FileSystemInter> fs, const char * path, int flag, + const Permission & permission, bool createParent, int replication, + int64_t blockSize) = 0; + + /** + * To append data to file. + * @param buf the data used to append. + * @param size the data size. + */ + virtual void append(const char * buf, int64_t size) = 0; + + /** + * Flush all data in buffer and waiting for ack. + * Will block until get all acks. + */ + virtual void flush() = 0; + + /** + * return the current file length. + * @return current file length. + */ + virtual int64_t tell() = 0; + + /** + * @ref OutputStream::sync + */ + virtual void sync() = 0; + + /** + * close the stream. + */ + virtual void close() = 0; + + virtual std::string toString() = 0; + + virtual void setError(const exception_ptr & error) = 0; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Packet.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Packet.cpp b/depends/libhdfs3/src/client/Packet.cpp new file mode 100644 index 0000000..894e6bf --- /dev/null +++ b/depends/libhdfs3/src/client/Packet.cpp @@ -0,0 +1,156 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "BigEndian.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "Packet.h" +#include "PacketHeader.h" + +namespace Hdfs { +namespace Internal { + +Packet::Packet() : + lastPacketInBlock(false), syncBlock(false), checksumPos(0), checksumSize(0), + checksumStart(0), dataPos(0), dataStart(0), headerStart(0), maxChunks( + 0), numChunks(0), offsetInBlock(0), seqno(HEART_BEAT_SEQNO) { + buffer.resize(PacketHeader::GetPkgHeaderSize()); +} + +Packet::Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, + int64_t seqno, int checksumSize) : + lastPacketInBlock(false), syncBlock(false), checksumSize(checksumSize), headerStart(0), + maxChunks(chunksPerPkt), numChunks(0), offsetInBlock(offsetInBlock), seqno(seqno), buffer(pktSize) { + checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize(); + dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize; + assert(dataPos >= 0); +} + +void Packet::reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, + int64_t seqno, int checksumSize) { + lastPacketInBlock = false; + syncBlock = false; + this->checksumSize = checksumSize; + headerStart = 0; + maxChunks = chunksPerPkt; + numChunks = 0; + this->offsetInBlock = offsetInBlock; + this->seqno = seqno; + checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize(); + dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize; + + if (pktSize > static_cast<int>(buffer.size())) { + buffer.resize(pktSize); + } + + assert(dataPos >= 0); +} + +void Packet::addChecksum(uint32_t checksum) { + if (checksumPos + static_cast<int>(sizeof(uint32_t)) > dataStart) { + THROW(HdfsIOException, + "Packet: failed to add checksum into packet, checksum is too large"); + } + + WriteBigEndian32ToArray(checksum, &buffer[checksumPos]); + checksumPos += checksumSize; +} + +void Packet::addData(const char * buf, int size) { + if (size + dataPos > static_cast<int>(buffer.size())) { + THROW(HdfsIOException, + "Packet: failed add data to packet, packet size is too small"); + } + + memcpy(&buffer[dataPos], buf, size); + dataPos += size; + assert(dataPos >= 0); +} + +void Packet::setSyncFlag(bool sync) { + syncBlock = sync; +} + +void Packet::increaseNumChunks() { + ++numChunks; +} + +bool Packet::isFull() { + return numChunks >= maxChunks; +} + +bool Packet::isHeartbeat() { + return HEART_BEAT_SEQNO == seqno; +} + +void Packet::setLastPacketInBlock(bool lastPacket) { + lastPacketInBlock = lastPacket; +} + +int Packet::getDataSize() { + return dataPos - dataStart; +} + +int64_t Packet::getLastByteOffsetBlock() { + assert(offsetInBlock >= 0 && dataPos >= dataStart); + assert(dataPos - dataStart <= maxChunks * static_cast<int>(buffer.size())); + return offsetInBlock + dataPos - dataStart; +} + +const ConstPacketBuffer Packet::getBuffer() { + /* + * Once this is called, no more data can be added to the packet. + * This is called only when the packet is ready to be sent. + */ + int dataLen = dataPos - dataStart; + int checksumLen = checksumPos - checksumStart; + + if (checksumPos != dataStart) { + /* + * move the checksum to cover the gap. + * This can happen for the last packet. + */ + memmove(&buffer[dataStart - checksumLen], &buffer[checksumStart], + checksumLen); + headerStart = dataStart - checksumPos; + checksumStart += dataStart - checksumPos; + checksumPos = dataStart; + } + + assert(dataPos >= 0); + int pktLen = dataLen + checksumLen; + PacketHeader header(pktLen + sizeof(int32_t) + /* why we add 4 bytes? Because the server will reduce 4 bytes. -_-*/ + , offsetInBlock, seqno, lastPacketInBlock, dataLen); + header.writeInBuffer(&buffer[headerStart], + PacketHeader::GetPkgHeaderSize()); + return ConstPacketBuffer(&buffer[headerStart], + PacketHeader::GetPkgHeaderSize() + pktLen); +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Packet.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Packet.h b/depends/libhdfs3/src/client/Packet.h new file mode 100644 index 0000000..d51bd1b --- /dev/null +++ b/depends/libhdfs3/src/client/Packet.h @@ -0,0 +1,131 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PACKET_H_ +#define _HDFS_LIBHDFS3_CLIENT_PACKET_H_ + +#include <stdint.h> +#include <vector> + +#define HEART_BEAT_SEQNO -1 + +namespace Hdfs { +namespace Internal { + +class ConstPacketBuffer { +public: + ConstPacketBuffer(const char * buf, int size) : + buffer(buf), size(size) { + } + + const char * getBuffer() const { + return buffer; + } + + const int getSize() const { + return size; + } + +private: + const char * buffer; + const int size; +}; + +/** + * buffer is pointed into like follows: + * (C is checksum data, D is payload data) + * + * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + */ +class Packet { +public: + /** + * create a heart beat packet + */ + Packet(); + + /** + * create a new packet + */ + Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize); + + void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize); + + void addChecksum(uint32_t checksum); + + void addData(const char * buf, int size); + + void setSyncFlag(bool sync); + + void increaseNumChunks(); + + bool isFull(); + + bool isHeartbeat(); + + void setLastPacketInBlock(bool lastPacket); + + int getDataSize(); + + const ConstPacketBuffer getBuffer(); + + int64_t getLastByteOffsetBlock(); + + int64_t getSeqno() const { + return seqno; + } + + bool isLastPacketInBlock() const { + return lastPacketInBlock; + } + + int64_t getOffsetInBlock() const { + return offsetInBlock; + } + +private: + bool lastPacketInBlock; // is this the last packet in block + bool syncBlock; // sync block to disk? + int checksumPos; + int checksumSize; + int checksumStart; + int dataPos; + int dataStart; + int headerStart; + int maxChunks; // max chunks in packet + int numChunks; // number of chunks currently in packet + int64_t offsetInBlock; // offset in block + int64_t seqno; // sequence number of packet in block + std::vector<char> buffer; +}; + +} +} +#endif /* _HDFS_LIBHDFS3_CLIENT_PACKET_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketHeader.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PacketHeader.cpp b/depends/libhdfs3/src/client/PacketHeader.cpp new file mode 100644 index 0000000..b0bd687 --- /dev/null +++ b/depends/libhdfs3/src/client/PacketHeader.cpp @@ -0,0 +1,126 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "BigEndian.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "PacketHeader.h" + +namespace Hdfs { +namespace Internal { + +int PacketHeader::PkgHeaderSize = PacketHeader::CalcPkgHeaderSize(); + +int PacketHeader::CalcPkgHeaderSize() { + PacketHeaderProto header; + header.set_offsetinblock(0); + header.set_datalen(0); + header.set_lastpacketinblock(false); + header.set_seqno(0); + return header.ByteSize() + sizeof(int32_t) /*packet length*/ + sizeof(int16_t)/* proto length */; +} + +int PacketHeader::GetPkgHeaderSize() { + return PkgHeaderSize; +} + +PacketHeader::PacketHeader() : + packetLen(0) { +} + +PacketHeader::PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno, + bool lastPacketInBlock, int dataLen) : + packetLen(packetLen) { + proto.set_offsetinblock(offsetInBlock); + proto.set_seqno(seqno); + proto.set_lastpacketinblock(lastPacketInBlock); + proto.set_datalen(dataLen); +} + +int PacketHeader::getDataLen() { + return proto.datalen(); +} + +bool PacketHeader::isLastPacketInBlock() { + return proto.lastpacketinblock(); +} + +bool PacketHeader::sanityCheck(int64_t lastSeqNo) { + // We should only have a non-positive data length for the last packet + if (proto.datalen() <= 0 && !proto.lastpacketinblock()) + return false; + + // The last packet should not contain data + if (proto.lastpacketinblock() && proto.datalen() != 0) + return false; + + // Seqnos should always increase by 1 with each packet received + if (proto.seqno() != lastSeqNo + 1) + return false; + + return true; +} + +int64_t PacketHeader::getSeqno() { + return proto.seqno(); +} + +int64_t PacketHeader::getOffsetInBlock() { + return proto.offsetinblock(); +} + +int PacketHeader::getPacketLen() { + return packetLen; +} + +void PacketHeader::readFields(const char * buf, size_t size) { + int16_t protoLen; + assert(size > sizeof(packetLen) + sizeof(protoLen)); + packetLen = ReadBigEndian32FromArray(buf); + protoLen = ReadBigEndian16FromArray(buf + sizeof(packetLen)); + + if (packetLen < static_cast<int>(sizeof(int32_t)) || protoLen < 0 + || static_cast<int>(sizeof(packetLen) + sizeof(protoLen)) + protoLen > static_cast<int>(size)) { + THROW(HdfsIOException, "Invalid PacketHeader, packetLen is %d, protoLen is %hd, buf size is %zu", packetLen, + protoLen, size); + } + + if (!proto.ParseFromArray(buf + sizeof(packetLen) + sizeof(protoLen), + protoLen)) { + THROW(HdfsIOException, + "PacketHeader cannot parse PacketHeaderProto from datanode response."); + } +} + +void PacketHeader::writeInBuffer(char * buf, size_t size) { + buf = WriteBigEndian32ToArray(packetLen, buf); + buf = WriteBigEndian16ToArray(proto.ByteSize(), buf); + proto.SerializeToArray(buf, size - sizeof(int32_t) - sizeof(int16_t)); +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketHeader.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PacketHeader.h b/depends/libhdfs3/src/client/PacketHeader.h new file mode 100644 index 0000000..940c0c4 --- /dev/null +++ b/depends/libhdfs3/src/client/PacketHeader.h @@ -0,0 +1,68 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_ +#define _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_ + +#include "datatransfer.pb.h" + +namespace Hdfs { +namespace Internal { + +class PacketHeader { +public: + PacketHeader(); + PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno, + bool lastPacketInBlock, int dataLen); + bool isLastPacketInBlock(); + bool sanityCheck(int64_t lastSeqNo); + int getDataLen(); + int getPacketLen(); + int64_t getOffsetInBlock(); + int64_t getSeqno(); + void readFields(const char * buf, size_t size); + /** + * Write the header into the buffer. + * This requires that PKT_HEADER_LEN bytes are available. + */ + void writeInBuffer(char * buf, size_t size); + +public: + static int GetPkgHeaderSize(); + static int CalcPkgHeaderSize(); + +private: + static int PkgHeaderSize; +private: + int32_t packetLen; + PacketHeaderProto proto; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketPool.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PacketPool.cpp b/depends/libhdfs3/src/client/PacketPool.cpp new file mode 100644 index 0000000..828c1ec --- /dev/null +++ b/depends/libhdfs3/src/client/PacketPool.cpp @@ -0,0 +1,63 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Logger.h" +#include "Packet.h" +#include "PacketPool.h" + +namespace Hdfs { +namespace Internal { + +PacketPool::PacketPool(int size) : + maxSize(size) { +} + +shared_ptr<Packet> PacketPool::getPacket(int pktSize, int chunksPerPkt, + int64_t offsetInBlock, int64_t seqno, int checksumSize) { + if (packets.empty()) { + return shared_ptr<Packet>( + new Packet(pktSize, chunksPerPkt, offsetInBlock, seqno, + checksumSize)); + } else { + shared_ptr<Packet> retval = packets.front(); + packets.pop_front(); + retval->reset(pktSize, chunksPerPkt, offsetInBlock, seqno, + checksumSize); + return retval; + } +} + +void PacketPool::relesePacket(shared_ptr<Packet> packet) { + if (static_cast<int>(packets.size()) >= maxSize) { + return; + } + + packets.push_back(packet); +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketPool.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PacketPool.h b/depends/libhdfs3/src/client/PacketPool.h new file mode 100644 index 0000000..6194911 --- /dev/null +++ b/depends/libhdfs3/src/client/PacketPool.h @@ -0,0 +1,71 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_ +#define _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_ +#include "Memory.h" + +#include <deque> + +namespace Hdfs { +namespace Internal { + +class Packet; + +/* + * A simple packet pool implementation. + * + * Packet is created here if no packet is available. + * And then add to Pipeline's packet queue to wait for the ack. + * The Pipeline's packet queue size is not larger than the PacketPool's max size, + * otherwise the write operation will be pending for the ack. + * Once the ack is received, packet will reutrn back to the PacketPool to reuse. + */ +class PacketPool { +public: + PacketPool(int size); + shared_ptr<Packet> getPacket(int pktSize, int chunksPerPkt, + int64_t offsetInBlock, int64_t seqno, int checksumSize); + void relesePacket(shared_ptr<Packet> packet); + + void setMaxSize(int size) { + this->maxSize = size; + } + + int getMaxSize() const { + return maxSize; + } + +private: + int maxSize; + std::deque<shared_ptr<Packet> > packets; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PeerCache.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PeerCache.cpp b/depends/libhdfs3/src/client/PeerCache.cpp new file mode 100644 index 0000000..98884fe --- /dev/null +++ b/depends/libhdfs3/src/client/PeerCache.cpp @@ -0,0 +1,82 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <inttypes.h> + +#include "client/PeerCache.h" + +namespace Hdfs { +namespace Internal { + +LruMap<std::string, PeerCache::value_type> PeerCache::Map; + +PeerCache::PeerCache(const SessionConfig& conf) + : cacheSize(conf.getSocketCacheCapacity()), + expireTimeInterval(conf.getSocketCacheExpiry()) { + Map.setMaxSize(cacheSize); +} + +std::string PeerCache::buildKey(const DatanodeInfo& datanode) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << datanode.getIpAddr() << datanode.getXferPort() + << datanode.getDatanodeId(); + return ss.str(); +} + +shared_ptr<Socket> PeerCache::getConnection(const DatanodeInfo& datanode) { + std::string key = buildKey(datanode); + value_type value; + int64_t elipsed; + + if (!Map.findAndErase(key, &value)) { + LOG(DEBUG1, "PeerCache miss for datanode %s uuid(%s).", + datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str()); + return shared_ptr<Socket>(); + } else if ((elipsed = ToMilliSeconds(value.second, steady_clock::now())) > + expireTimeInterval) { + LOG(DEBUG1, "PeerCache expire for datanode %s uuid(%s).", + datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str()); + return shared_ptr<Socket>(); + } + + LOG(DEBUG1, "PeerCache hit for datanode %s uuid(%s), elipsed %" PRId64, + datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str(), + elipsed); + return value.first; +} + +void PeerCache::addConnection(shared_ptr<Socket> peer, + const DatanodeInfo& datanode) { + std::string key = buildKey(datanode); + value_type value(peer, steady_clock::now()); + Map.insert(key, value); + LOG(DEBUG1, "PeerCache add for datanode %s uuid(%s).", + datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str()); +} +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PeerCache.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PeerCache.h b/depends/libhdfs3/src/client/PeerCache.h new file mode 100644 index 0000000..6c2352c --- /dev/null +++ b/depends/libhdfs3/src/client/PeerCache.h @@ -0,0 +1,65 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_ +#define _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_ + +#include <string> +#include <utility> + +#include "common/DateTime.h" +#include "common/LruMap.h" +#include "common/Memory.h" +#include "common/SessionConfig.h" +#include "network/Socket.h" +#include "server/DatanodeInfo.h" + +namespace Hdfs { +namespace Internal { + +class PeerCache { + public: + explicit PeerCache(const SessionConfig& conf); + + shared_ptr<Socket> getConnection(const DatanodeInfo& datanode); + + void addConnection(shared_ptr<Socket> peer, const DatanodeInfo& datanode); + + typedef std::pair<shared_ptr<Socket>, steady_clock::time_point> value_type; + + private: + std::string buildKey(const DatanodeInfo& datanode); + + private: + const int cacheSize; + int64_t expireTimeInterval; // milliseconds + static LruMap<std::string, value_type> Map; +}; +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Permission.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Permission.cpp b/depends/libhdfs3/src/client/Permission.cpp new file mode 100644 index 0000000..09db3ee --- /dev/null +++ b/depends/libhdfs3/src/client/Permission.cpp @@ -0,0 +1,48 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Permission.h" + +#include "Exception.h" +#include "ExceptionInternal.h" + +namespace Hdfs { + +Permission::Permission(uint16_t mode) { + if (mode >> 10) { + THROW(InvalidParameter, + "Invalid parameter: cannot convert %u to \"Permission\"", + static_cast<unsigned int>(mode)); + } + + userAction = (Action)((mode >> 6) & 7); + groupAction = (Action)((mode >> 3) & 7); + otherAction = (Action)(mode & 7); + stickyBit = (((mode >> 9) & 1) == 1); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Permission.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Permission.h b/depends/libhdfs3/src/client/Permission.h new file mode 100644 index 0000000..895868d --- /dev/null +++ b/depends/libhdfs3/src/client/Permission.h @@ -0,0 +1,224 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_ +#define _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_ + +#include <string> + +namespace Hdfs { + +/** + * Action is used to describe a action the user is permitted to apply on a file. + */ +enum Action { + NONE, //("---"), + EXECUTE, //("--x"), + WRITE, //("-w-"), + WRITE_EXECUTE, //("-wx"), + READ, //("r--"), + READ_EXECUTE, //("r-x"), + READ_WRITE, //("rw-"), + ALL //("rwx"); +}; + +/** + * To test Action a if implies Action b + * @param a Action to be tested. + * @param b Action target. + * @return return true if a implies b. + */ +static inline bool implies(const Action & a, const Action & b) { + return (a & b) == b; +} + +/** + * To construct a new Action using a and b + * @param a Action to be used. + * @param b Action to be used. + * @return return a new Action. + */ +static inline Action operator &(const Action & a, const Action & b) { + return (Action)(((unsigned int) a) & (unsigned int) b); +} +/** + * To construct a new Action using a or b + * @param a Action to be used. + * @param b Action to be used. + * @return return a new Action. + */ +static inline Action operator |(const Action & a, const Action & b) { + return (Action)(((unsigned int) a) | (unsigned int) b); +} +/** + * To construct a new Action of complementary of a given Action + * @param a Action to be used. + * @return return a new Action + */ +static inline Action operator ~(const Action & a) { + return (Action)(7 - (unsigned int) a); +} + +/** + * To convert a Action to a readable string. + * @param a the Action to be convert. + * @return a readable string + */ +static inline std::string toString(const Action & a) { + switch (a) { + case NONE: + return "---"; + + case EXECUTE: + return "--x"; + + case WRITE: + return "-w-"; + + case WRITE_EXECUTE: + return "-wx"; + + case READ: + return "r--"; + + case READ_EXECUTE: + return "r-x"; + + case READ_WRITE: + return "rw-"; + + case ALL: + return "rwx"; + } +} + +/** + * Permission is used to describe a file permission. + */ +class Permission { +public: + /** + * To construct a Permission. + * @param u owner permission. + * @param g group permission. + * @param o other user permission. + */ + Permission(const Action & u, const Action & g, const Action & o) : + userAction(u), groupAction(g), otherAction(o), stickyBit(false) { + } + + /** + * To construct a Permission from a uint16. + * @param mode permission flag. + */ + Permission(uint16_t mode); + +public: + /** + * To get group permission + * @return the group permission + */ + Action getGroupAction() const { + return groupAction; + } + + /** + * To set group permission + * @param groupAction the group permission + */ + void setGroupAction(Action groupAction) { + this->groupAction = groupAction; + } + + /** + * To get other user permission + * @return other user permission + */ + Action getOtherAction() const { + return otherAction; + } + + /** + * To set other user permission + * @param otherAction other user permission + */ + void setOtherAction(Action otherAction) { + this->otherAction = otherAction; + } + + /** + * To get owner permission + * @return the owner permission + */ + Action getUserAction() const { + return userAction; + } + + /** + * To set owner permission + * @param userAction the owner permission + */ + void setUserAction(Action userAction) { + this->userAction = userAction; + } + + /** + * To convert a Permission to a readable string + * @return a readable string + */ + std::string toString() const { + return Hdfs::toString(userAction) + Hdfs::toString(groupAction) + + Hdfs::toString(otherAction); + } + + /** + * To convert a Permission to a uint16 flag + * @return a uint16 flag + */ + uint16_t toShort() const { + return (uint16_t)((((uint16_t) userAction) << 6) + + (((uint16_t) groupAction) << 3) + (((uint16_t) otherAction)) + + ((stickyBit ? 1 << 9 : 0))); + } + + bool operator ==(const Permission & other) const { + return userAction == other.userAction + && groupAction == other.groupAction + && otherAction == other.otherAction + && stickyBit == other.stickyBit; + } + +private: + Action userAction; + Action groupAction; + Action otherAction; + + bool stickyBit; +}; + +} +#endif /* _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_ */