http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Pipeline.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Pipeline.cpp b/depends/libhdfs3/src/client/Pipeline.cpp new file mode 100644 index 0000000..3396e31 --- /dev/null +++ b/depends/libhdfs3/src/client/Pipeline.cpp @@ -0,0 +1,792 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DateTime.h" +#include "Pipeline.h" +#include "Logger.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "OutputStreamInter.h" +#include "FileSystemInter.h" +#include "DataTransferProtocolSender.h" +#include "datatransfer.pb.h" + +#include <inttypes.h> + +namespace Hdfs { +namespace Internal { + +PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig & conf, + shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize, + int replication, int64_t bytesSent, PacketPool & packetPool, shared_ptr<LocatedBlock> lastBlock) : + checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), replication(replication), bytesAcked( + bytesSent), bytesSent(bytesSent), packetPool(packetPool), filesystem(filesystem), lastBlock(lastBlock), path( + path) { + canAddDatanode = conf.canAddDatanode(); + blockWriteRetry = conf.getBlockWriteRetry(); + connectTimeout = conf.getOutputConnTimeout(); + readTimeout = conf.getOutputReadTimeout(); + writeTimeout = conf.getOutputWriteTimeout(); + clientName = filesystem->getClientName(); + + if (append) { + LOG(DEBUG2, "create pipeline for file %s to append to %s at position %" PRId64, + path, lastBlock->toString().c_str(), lastBlock->getNumBytes()); + stage = PIPELINE_SETUP_APPEND; + assert(lastBlock); + nodes = lastBlock->getLocations(); + storageIDs = lastBlock->getStorageIDs(); + buildForAppendOrRecovery(false); + stage = DATA_STREAMING; + } else { + LOG(DEBUG2, "create pipeline for file %s to write to a new block", path); + stage = PIPELINE_SETUP_CREATE; + buildForNewBlock(); + stage = DATA_STREAMING; + } +} + +int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) { + if (nodes.size() != original.size() + 1) { + THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode.", + lastBlock->toString().c_str()); + } + + for (size_t i = 0; i < nodes.size(); i++) { + size_t j = 0; + + for (; j < original.size() && !(nodes[i] == original[j]); j++) + ; + + if (j == original.size()) { + return i; + } + } + + THROW(HdfsIOException, "Cannot add new datanode for block %s.", lastBlock->toString().c_str()); +} + +void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & src, + const std::vector<DatanodeInfo> & targets, const Token & token) { + shared_ptr<Socket> so(new TcpSocketImpl); + shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so)); + so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout); + DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress()); + sender.transferBlock(blk, token, clientName.c_str(), targets); + int size; + size = in->readVarint32(readTimeout); + std::vector<char> buf(size); + in->readFully(&buf[0], size, readTimeout); + BlockOpResponseProto resp; + + if (!resp.ParseFromArray(&buf[0], size)) { + THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s.", + src.formatAddress().c_str(), lastBlock->toString().c_str()); + } + + if (Status::DT_PROTO_SUCCESS != resp.status()) { + THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s.", + targets[0].formatAddress().c_str(), + lastBlock->toString().c_str()); + } +} + +bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes) { + try { + /* + * get a new datanode + */ + std::vector<DatanodeInfo> original = nodes; + shared_ptr<LocatedBlock> lb; + lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs, + excludedNodes, 1); + nodes = lb->getLocations(); + storageIDs = lb->getStorageIDs(); + + /* + * failed to add new datanode into pipeline. + */ + if (original.size() == nodes.size()) { + LOG(LOG_ERROR, + "Failed to add new datanode into pipeline for block: %s file %s.", + lastBlock->toString().c_str(), path.c_str()); + } else { + /* + * find the new datanode + */ + int d = findNewDatanode(original); + /* + * in case transfer block fail. + */ + errorIndex = d; + /* + * transfer replica + */ + DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1]; + std::vector<DatanodeInfo> targets; + targets.push_back(nodes[d]); + LOG(INFO, "Replicate block %s from %s to %s for file %s.", lastBlock->toString().c_str(), + src.formatAddress().c_str(), targets[0].formatAddress().c_str(), path.c_str()); + transfer(*lastBlock, src, targets, lb->getToken()); + errorIndex = -1; + return true; + } + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsFileSystemClosed & e) { + throw; + } catch (const SafeModeException & e) { + throw; + } catch (const HdfsException & e) { + std::string buffer; + LOG(LOG_ERROR, + "Failed to add a new datanode into pipeline for block: %s file %s.\n%s", + lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + } + + return false; +} + +void PipelineImpl::checkPipelineWithReplicas() { + if (static_cast<int>(nodes.size()) < replication) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + int size = nodes.size(); + + for (int i = 0; i < size - 1; ++i) { + ss << nodes[i].formatAddress() << ", "; + } + + if (nodes.empty()) { + ss << "Empty"; + } else { + ss << nodes.back().formatAddress(); + } + + LOG(WARNING, + "the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s", + static_cast<int>(nodes.size()), ss.str().c_str(), replication, + lastBlock->toString().c_str(), path.c_str()); + } +} + +void PipelineImpl::buildForAppendOrRecovery(bool recovery) { + int64_t gs = 0; + int retry = blockWriteRetry; + exception_ptr lastException; + std::vector<DatanodeInfo> excludedNodes; + shared_ptr<LocatedBlock> lb; + std::string buffer; + + do { + /* + * Remove bad datanode from list of datanodes. + * If errorIndex was not set (i.e. appends), then do not remove + * any datanodes + */ + if (errorIndex >= 0) { + assert(lastBlock); + LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s.", + nodes[errorIndex].formatAddress().c_str(), + (recovery ? "recovery" : "append to"), lastBlock->toString().c_str(), + path.c_str(), StageToString(stage)); + excludedNodes.push_back(nodes[errorIndex]); + nodes.erase(nodes.begin() + errorIndex); + + if (!storageIDs.empty()) { + storageIDs.erase(storageIDs.begin() + errorIndex); + } + + if (nodes.empty()) { + THROW(HdfsIOException, + "Build pipeline to %s block %s failed: all datanodes are bad.", + (recovery ? "recovery" : "append to"), lastBlock->toString().c_str()); + } + + errorIndex = -1; + } + + try { + gs = 0; + + /* + * Check if the number of datanodes in pipeline satisfy the replication requirement, + * add new datanode if not + */ + if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE + && static_cast<int>(nodes.size()) < replication && canAddDatanode) { + if (!addDatanodeToPipeline(excludedNodes)) { + THROW(HdfsIOException, + "Failed to add new datanode into pipeline for block: %s file %s, " + "set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature.", + lastBlock->toString().c_str(), path.c_str()); + } + } + + if (errorIndex >= 0) { + continue; + } + + checkPipelineWithReplicas(); + /* + * Update generation stamp and access token + */ + lb = filesystem->updateBlockForPipeline(*lastBlock); + gs = lb->getGenerationStamp(); + /* + * Try to build pipeline + */ + createBlockOutputStream(lb->getToken(), gs, recovery); + /* + * everything is ok, reset errorIndex. + */ + errorIndex = -1; + lastException = exception_ptr(); + break; //break on success + } catch (const HdfsInvalidBlockToken & e) { + lastException = current_exception(); + recovery = true; + LOG(LOG_ERROR, + "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s", + lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); + LOG(INFO, "Try to recovery pipeline for block %s file %s.", + lastBlock->toString().c_str(), path.c_str()); + } catch (const HdfsTimeoutException & e) { + lastException = current_exception(); + recovery = true; + LOG(LOG_ERROR, + "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s", + lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); + LOG(INFO, "Try to recovery pipeline for block %s file %s.", + lastBlock->toString().c_str(), path.c_str()); + } catch (const HdfsIOException & e) { + lastException = current_exception(); + /* + * Set recovery flag to true in case of failed to create a pipeline for appending. + */ + recovery = true; + LOG(LOG_ERROR, + "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s", + lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); + LOG(INFO, "Try to recovery pipeline for block %s file %s.", lastBlock->toString().c_str(), path.c_str()); + } + + /* + * we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop. + * it may caused by rpc call throw HdfsIOException + */ + if (errorIndex < 0) { + --retry; + } + } while (retry > 0); + + if (lastException) { + rethrow_exception(lastException); + } + + /* + * Update pipeline at the namenode, non-idempotent RPC call. + */ + lb->setPoolId(lastBlock->getPoolId()); + lb->setBlockId(lastBlock->getBlockId()); + lb->setLocations(nodes); + lb->setStorageIDs(storageIDs); + lb->setNumBytes(lastBlock->getNumBytes()); + lb->setOffset(lastBlock->getOffset()); + filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs); + lastBlock = lb; +} + +void PipelineImpl::locateNextBlock( + const std::vector<DatanodeInfo> & excludedNodes) { + milliseconds sleeptime(100); + milliseconds fiveSeconds(5000); + int retry = blockWriteRetry; + + while (true) { + try { + lastBlock = filesystem->addBlock(path, lastBlock.get(), + excludedNodes); + assert(lastBlock); + return; + } catch (const NotReplicatedYetException & e) { + LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock for block %s, " + "already retry %d times, max retry %d times", lastBlock->toString().c_str(), + blockWriteRetry - retry, blockWriteRetry); + + if (retry--) { + try { + sleep_for(sleeptime); + } catch (...) { + } + + sleeptime *= 2; + sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds; + } else { + throw; + } + } + } +} + +static std::string FormatExcludedNodes( + const std::vector<DatanodeInfo> & excludedNodes) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << "["; + int size = excludedNodes.size(); + + for (int i = 0; i < size - 1; ++i) { + ss << excludedNodes[i].formatAddress() << ", "; + } + + if (excludedNodes.empty()) { + ss << "Empty"; + } else { + ss << excludedNodes.back().formatAddress(); + } + + ss << "]"; + return ss.str(); +} + +void PipelineImpl::buildForNewBlock() { + int retryAllocNewBlock = 0, retry = blockWriteRetry; + LocatedBlock lb; + std::vector<DatanodeInfo> excludedNodes; + shared_ptr<LocatedBlock> block = lastBlock; + std::string buffer; + + do { + errorIndex = -1; + lastBlock = block; + + try { + locateNextBlock(excludedNodes); + lastBlock->setNumBytes(0); + nodes = lastBlock->getLocations(); + storageIDs = lastBlock->getStorageIDs(); + } catch (const HdfsRpcException & e) { + const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null"; + LOG(LOG_ERROR, + "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s", + path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer)); + + if (retryAllocNewBlock > blockWriteRetry) { + throw; + } + + LOG(INFO, "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s.", + path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str()); + ++retryAllocNewBlock; + continue; + } catch (const HdfsException & e) { + const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null"; + LOG(LOG_ERROR, + "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s", + path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer)); + throw; + } + + retryAllocNewBlock = 0; + checkPipelineWithReplicas(); + + if (nodes.empty()) { + THROW(HdfsIOException, + "No datanode is available to create a pipeline for block %s file %s.", + lastBlock->toString().c_str(), path.c_str()); + } + + try { + createBlockOutputStream(lastBlock->getToken(), 0, false); + break; //break on success + } catch (const HdfsInvalidBlockToken & e) { + LOG(LOG_ERROR, + "Failed to setup the pipeline for new block %s file %s.\n%s", + lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + } catch (const HdfsTimeoutException & e) { + LOG(LOG_ERROR, + "Failed to setup the pipeline for new block %s file %s.\n%s", + lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + } catch (const HdfsIOException & e) { + LOG(LOG_ERROR, + "Failed to setup the pipeline for new block %s file %s.\n%s", + lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + } + + LOG(INFO, "Abandoning block: %s for file %s.", lastBlock->toString().c_str(), path.c_str()); + + try { + filesystem->abandonBlock(*lastBlock, path); + } catch (const HdfsException & e) { + LOG(LOG_ERROR, + "Failed to abandon useless block %s for file %s.\n%s", + lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + throw; + } + + if (errorIndex >= 0) { + LOG(INFO, "Excluding invalid datanode: %s for block %s for file %s", + nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), path.c_str()); + excludedNodes.push_back(nodes[errorIndex]); + } else { + /* + * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop. + */ + --retry; + } + } while (retry); +} + +/* + * bad link node must be either empty or a "IP:PORT" + */ +void PipelineImpl::checkBadLinkFormat(const std::string & n) { + std::string node = n; + + if (node.empty()) { + return; + } + + do { + const char * host = &node[0], *port; + size_t pos = node.find_last_of(":"); + + if (pos == node.npos || pos + 1 == node.length()) { + break; + } + + node[pos] = 0; + port = &node[pos + 1]; + struct addrinfo hints, *addrs; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; + int p; + char * end; + p = strtol(port, &end, 0); + + if (p >= 65536 || p <= 0 || end != port + strlen(port)) { + break; + } + + if (getaddrinfo(host, port, &hints, &addrs)) { + break; + } + + freeaddrinfo(addrs); + return; + } while (0); + + LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.", + n.c_str()); + THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.", + n.c_str()); +} + +void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, bool recovery) { + std::string firstBadLink; + exception_ptr lastError; + bool needWrapException = true; + + try { + sock = shared_ptr < Socket > (new TcpSocketImpl); + reader = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock)); + sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(), + connectTimeout); + std::vector<DatanodeInfo> targets; + + for (size_t i = 1; i < nodes.size(); ++i) { + targets.push_back(nodes[i]); + } + + DataTransferProtocolSender sender(*sock, writeTimeout, + nodes[0].formatAddress()); + sender.writeBlock(*lastBlock, token, clientName.c_str(), targets, + (recovery ? (stage | 0x1) : stage), nodes.size(), + lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize); + int size; + size = reader->readVarint32(readTimeout); + std::vector<char> buf(size); + reader->readFully(&buf[0], size, readTimeout); + BlockOpResponseProto resp; + + if (!resp.ParseFromArray(&buf[0], size)) { + THROW(HdfsIOException, "cannot parse datanode response from %s for block %s.", + nodes[0].formatAddress().c_str(), lastBlock->toString().c_str()); + } + + Status pipelineStatus = resp.status(); + firstBadLink = resp.firstbadlink(); + + if (Status::DT_PROTO_SUCCESS != pipelineStatus) { + needWrapException = false; + + if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) { + THROW(HdfsInvalidBlockToken, + "Got access token error for connect ack with firstBadLink as %s for block %s", + firstBadLink.c_str(), lastBlock->toString().c_str()); + } else { + THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s", + firstBadLink.c_str(), lastBlock->toString().c_str()); + } + } + + return; + } catch (...) { + errorIndex = 0; + lastError = current_exception(); + } + + checkBadLinkFormat(firstBadLink); + + if (!firstBadLink.empty()) { + for (size_t i = 0; i < nodes.size(); ++i) { + if (nodes[i].getXferAddr() == firstBadLink) { + errorIndex = i; + break; + } + } + } + + assert(lastError); + + if (!needWrapException) { + rethrow_exception(lastError); + } + + try { + rethrow_exception(lastError); + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "Cannot create block output stream for block %s, " + "recovery flag: %s, with last generate stamp %" PRId64 ".", + lastBlock->toString().c_str(), (recovery ? "true" : "false"), gs); + } +} + +void PipelineImpl::resend() { + assert(stage != PIPELINE_CLOSE); + + for (size_t i = 0; i < packets.size(); ++i) { + ConstPacketBuffer b = packets[i]->getBuffer(); + sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout); + int64_t tmp = packets[i]->getLastByteOffsetBlock(); + bytesSent = bytesSent > tmp ? bytesSent : tmp; + } +} + +void PipelineImpl::send(shared_ptr<Packet> packet) { + ConstPacketBuffer buffer = packet->getBuffer(); + + if (!packet->isHeartbeat()) { + packets.push_back(packet); + } + + /* + * too many packets pending on the ack. wait in case of consuming to much memory. + */ + if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) { + waitForAcks(false); + } + + bool failover = false; + + do { + try { + if (failover) { + resend(); + } else { + assert(sock); + sock->writeFully(buffer.getBuffer(), buffer.getSize(), + writeTimeout); + int64_t tmp = packet->getLastByteOffsetBlock(); + bytesSent = bytesSent > tmp ? bytesSent : tmp; + } + + checkResponse(false); + return; + } catch (const HdfsIOException & e) { + if (errorIndex < 0) { + errorIndex = 0; + } + + sock.reset(); + } + + buildForAppendOrRecovery(true); + failover = true; + + if (stage == PIPELINE_CLOSE) { + assert(packets.size() == 1 && packets[0]->isLastPacketInBlock()); + packets.clear(); + break; + } + } while (true); +} + +void PipelineImpl::processAck(PipelineAck & ack) { + assert(!ack.isInvalid()); + int64_t seqno = ack.getSeqno(); + + if (HEART_BEAT_SEQNO == seqno) { + return; + } + + assert(!packets.empty()); + Packet & packet = *packets[0]; + + if (ack.isSuccess()) { + if (packet.getSeqno() != seqno) { + THROW(HdfsIOException, + "processAck: pipeline ack expecting seqno %" PRId64 " but received %" PRId64 " for block %s.", + packet.getSeqno(), seqno, lastBlock->toString().c_str()); + } + + int64_t tmp = packet.getLastByteOffsetBlock(); + bytesAcked = tmp > bytesAcked ? tmp : bytesAcked; + assert(lastBlock); + lastBlock->setNumBytes(bytesAcked); + + if (packet.isLastPacketInBlock()) { + sock.reset(); + } + + packetPool.relesePacket(packets[0]); + packets.pop_front(); + } else { + for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) { + if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) { + errorIndex = i; + /* + * handle block token expire as same as HdfsIOException. + */ + THROW(HdfsIOException, + "processAck: ack report error at node: %s for block %s.", + nodes[i].formatAddress().c_str(), lastBlock->toString().c_str()); + } + } + } +} + +void PipelineImpl::processResponse() { + PipelineAck ack; + std::vector<char> buf; + int size = reader->readVarint32(readTimeout); + ack.reset(); + buf.resize(size); + reader->readFully(&buf[0], size, readTimeout); + ack.readFrom(&buf[0], size); + + if (ack.isInvalid()) { + THROW(HdfsIOException, + "processAllAcks: get an invalid DataStreamer packet ack for block %s", + lastBlock->toString().c_str()); + } + + processAck(ack); +} + +void PipelineImpl::checkResponse(bool wait) { + int timeout = wait ? readTimeout : 0; + bool readable = reader->poll(timeout); + + if (readable) { + processResponse(); + } else if (wait) { + THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response.", + lastBlock->toString().c_str(), + nodes[0].formatAddress().c_str()); + } +} + +void PipelineImpl::flush() { + waitForAcks(true); +} + +void PipelineImpl::waitForAcks(bool force) { + bool failover = false; + + while (!packets.empty()) { + /* + * just wait for some acks in case of consuming too much memory. + */ + if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) { + return; + } + + try { + if (failover) { + resend(); + } + + checkResponse(true); + failover = false; + } catch (const HdfsIOException & e) { + if (errorIndex < 0) { + errorIndex = 0; + } + + std::string buffer; + LOG(LOG_ERROR, + "Failed to flush pipeline on datanode %s for block %s file %s.\n%s", + nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), + path.c_str(), GetExceptionDetail(e, buffer)); + LOG(INFO, "Rebuild pipeline to flush for block %s file %s.", lastBlock->toString().c_str(), path.c_str()); + sock.reset(); + failover = true; + } + + if (failover) { + buildForAppendOrRecovery(true); + + if (stage == PIPELINE_CLOSE) { + assert(packets.size() == 1 && packets[0]->isLastPacketInBlock()); + packets.clear(); + break; + } + } + } +} + +shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) { + waitForAcks(true); + lastPacket->setLastPacketInBlock(true); + stage = PIPELINE_CLOSE; + send(lastPacket); + waitForAcks(true); + sock.reset(); + lastBlock->setNumBytes(bytesAcked); + LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64, + path.c_str(), lastBlock->toString().c_str(), + lastBlock->getNumBytes()); + return lastBlock; +} + +} +}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Pipeline.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Pipeline.h b/depends/libhdfs3/src/client/Pipeline.h new file mode 100644 index 0000000..5e30dd6 --- /dev/null +++ b/depends/libhdfs3/src/client/Pipeline.h @@ -0,0 +1,203 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ +#define _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ + +#include "FileSystemInter.h" +#include "Memory.h" +#include "network/BufferedSocketReader.h" +#include "network/TcpSocket.h" +#include "Packet.h" +#include "PacketPool.h" +#include "PipelineAck.h" +#include "server/DatanodeInfo.h" +#include "server/LocatedBlock.h" +#include "server/Namenode.h" +#include "SessionConfig.h" +#include "Thread.h" + +#include <vector> +#include <deque> + +namespace Hdfs { +namespace Internal { + +enum BlockConstructionStage { + /** + * The enumerates are always listed as regular stage followed by the + * recovery stage. + * Changing this order will make getRecoveryStage not working. + */ + // pipeline set up for block append + PIPELINE_SETUP_APPEND = 0, + // pipeline set up for failed PIPELINE_SETUP_APPEND recovery + PIPELINE_SETUP_APPEND_RECOVERY = 1, + // data streaming + DATA_STREAMING = 2, + // pipeline setup for failed data streaming recovery + PIPELINE_SETUP_STREAMING_RECOVERY = 3, + // close the block and pipeline + PIPELINE_CLOSE = 4, + // Recover a failed PIPELINE_CLOSE + PIPELINE_CLOSE_RECOVERY = 5, + // pipeline set up for block creation + PIPELINE_SETUP_CREATE = 6 +}; + +static inline const char * StageToString(BlockConstructionStage stage) { + switch (stage) { + case PIPELINE_SETUP_APPEND: + return "PIPELINE_SETUP_APPEND"; + + case PIPELINE_SETUP_APPEND_RECOVERY: + return "PIPELINE_SETUP_APPEND_RECOVERY"; + + case DATA_STREAMING: + return "DATA_STREAMING"; + + case PIPELINE_SETUP_STREAMING_RECOVERY: + return "PIPELINE_SETUP_STREAMING_RECOVERY"; + + case PIPELINE_CLOSE: + return "PIPELINE_CLOSE"; + + case PIPELINE_CLOSE_RECOVERY: + return "PIPELINE_CLOSE_RECOVERY"; + + case PIPELINE_SETUP_CREATE: + return "PIPELINE_SETUP_CREATE"; + + default: + return "UNKNOWN STAGE"; + } +} + +class Packet; +class OutputStreamImpl; + +/** + * setup, data transfer, close, and failover. + */ +class Pipeline { +public: + + virtual ~Pipeline() {} + + /** + * send all data and wait for all ack. + */ + virtual void flush() = 0; + + /** + * send LastPacket and close the pipeline. + */ + virtual shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket) = 0; + + /** + * send a packet, retry on error until fatal. + * @param packet + */ + virtual void send(shared_ptr<Packet> packet) = 0; +}; + +class PipelineImpl : public Pipeline { +public: + /** + * construct and setup the pipeline for append. + */ + PipelineImpl(bool append, const char * path, const SessionConfig & conf, + shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize, + int replication, int64_t bytesSent, PacketPool & packetPool, + shared_ptr<LocatedBlock> lastBlock); + + /** + * send all data and wait for all ack. + */ + void flush(); + + /** + * send LastPacket and close the pipeline. + */ + shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket); + + /** + * send a packet, retry on error until fatal. + * @param packet + */ + void send(shared_ptr<Packet> packet); + +private: + bool addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes); + void buildForAppendOrRecovery(bool recovery); + void buildForNewBlock(); + void checkPipelineWithReplicas(); + void checkResponse(bool wait); + void createBlockOutputStream(const Token & token, int64_t gs, bool recovery); + void locateNextBlock(const std::vector<DatanodeInfo> & excludedNodes); + void processAck(PipelineAck & ack); + void processResponse(); + void resend(); + void waitForAcks(bool force); + void transfer(const ExtendedBlock & blk, const DatanodeInfo & src, + const std::vector<DatanodeInfo> & targets, + const Token & token); + int findNewDatanode(const std::vector<DatanodeInfo> & original); + +private: + static void checkBadLinkFormat(const std::string & node); + +private: + BlockConstructionStage stage; + bool canAddDatanode; + int blockWriteRetry; + int checksumType; + int chunkSize; + int connectTimeout; + int errorIndex; + int readTimeout; + int replication; + int writeTimeout; + int64_t bytesAcked; //the size of bytes the ack received. + int64_t bytesSent; //the size of bytes has sent. + PacketPool & packetPool; + shared_ptr<BufferedSocketReader> reader; + shared_ptr<FileSystemInter> filesystem; + shared_ptr<LocatedBlock> lastBlock; + shared_ptr<Socket> sock; + std::deque<shared_ptr<Packet> > packets; + std::string clientName; + std::string path; + std::vector<DatanodeInfo> nodes; + std::vector<std::string> storageIDs; + +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PipelineAck.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/PipelineAck.h b/depends/libhdfs3/src/client/PipelineAck.h new file mode 100644 index 0000000..3498ca9 --- /dev/null +++ b/depends/libhdfs3/src/client/PipelineAck.h @@ -0,0 +1,92 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_ +#define _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_ + +#include "datatransfer.pb.h" + +namespace Hdfs { +namespace Internal { + +class PipelineAck { +public: + PipelineAck() : + invalid(true) { + } + + PipelineAck(const char * buf, int size) : + invalid(false) { + readFrom(buf, size); + } + + bool isInvalid() { + return invalid; + } + + int getNumOfReplies() { + return proto.status_size(); + } + + int64_t getSeqno() { + return proto.seqno(); + } + + Status getReply(int i) { + return proto.status(i); + } + + bool isSuccess() { + int size = proto.status_size(); + + for (int i = 0; i < size; ++i) { + if (Status::DT_PROTO_SUCCESS != proto.status(i)) { + return false; + } + } + + return true; + } + + void readFrom(const char * buf, int size) { + invalid = !proto.ParseFromArray(buf, size); + } + + void reset() { + proto.Clear(); + invalid = true; + } + +private: + PipelineAckProto proto; + bool invalid; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp b/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp new file mode 100644 index 0000000..4aea4ee --- /dev/null +++ b/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp @@ -0,0 +1,364 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "client/DataTransferProtocolSender.h" +#include "ReadShortCircuitInfo.h" +#include "server/Datanode.h" +#include "datatransfer.pb.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "network/DomainSocket.h" +#include "SWCrc32c.h" +#include "HWCrc32c.h" +#include "StringUtil.h" + +#include <inttypes.h> +#include <sstream> +#include <vector> + +namespace Hdfs { +namespace Internal { + +ReadShortCircuitFDCacheType + ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache; +BlockLocalPathInfoCacheType + ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache; + +ReadShortCircuitInfo::~ReadShortCircuitInfo() { + try { + dataFile.reset(); + metaFile.reset(); + ReadShortCircuitInfoBuilder::release(*this); + } catch (...) { + } +} + +ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() { + if (metafd != -1) { + ::close(metafd); + } + + if (datafd != -1) { + ::close(datafd); + } +} + +ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder( + const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf) + : dnInfo(dnInfo), auth(auth), conf(conf) {} + +shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate( + const ExtendedBlock& block, const Token token) { + shared_ptr<ReadShortCircuitInfo> retval; + ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), + block.getPoolId()); + + if (conf.isLegacyLocalBlockReader()) { + if (auth.getProtocol() != AuthProtocol::NONE) { + LOG(WARNING, + "Legacy read-shortcircuit only works for simple " + "authentication"); + return shared_ptr<ReadShortCircuitInfo>(); + } + + BlockLocalPathInfo info = getBlockLocalPathInfo(block, token); + assert(block.getBlockId() == info.getBlock().getBlockId() && + block.getPoolId() == info.getBlock().getPoolId()); + + if (0 != access(info.getLocalMetaPath(), R_OK)) { + invalidBlockLocalPathInfo(block); + LOG(WARNING, + "Legacy read-shortcircuit is enabled but path:%s is not " + "readable.", + info.getLocalMetaPath()); + return shared_ptr<ReadShortCircuitInfo>(); + } + + retval = createReadShortCircuitInfo(key, info); + } else { + shared_ptr<ReadShortCircuitFDHolder> fds; + + // find a pair available file descriptors in cache. + if (ReadShortCircuitFDCache.findAndErase(key, &fds)) { + try { + LOG(DEBUG1, + "Get file descriptors from cache for block %s, cache size %zu", + block.toString().c_str(), ReadShortCircuitFDCache.size()); + + return createReadShortCircuitInfo(key, fds); + } catch (...) { + // failed to create file wrapper from fds, retry with new fds. + } + } + + // create a new one + retval = createReadShortCircuitInfo(key, block, token); + ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize()); + } + + return retval; +} + +void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) { + if (info.isValid() && !info.isLegacy()) { + ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder()); + LOG(DEBUG1, + "Inserted file descriptors into cache for block %s, cache size %zu", + info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size()); + } +} + +BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo( + const ExtendedBlock& block, const Token& token) { + BlockLocalPathInfo retval; + + ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(), + block.getPoolId()); + + try { + if (!BlockLocalPathInfoCache.find(key, &retval)) { + RpcAuth a = auth; + SessionConfig c = conf; + c.setRpcMaxRetryOnConnect(1); + + /* + * only kerberos based authentication is allowed, do not add + * token + */ + shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl( + dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a)); + dn->getBlockLocalPathInfo(block, token, retval); + + BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize()); + BlockLocalPathInfoCache.insert(key, retval); + + LOG(DEBUG1, "Inserted block %s to local block info cache, cache size %zu", + block.toString().c_str(), BlockLocalPathInfoCache.size()); + } else { + LOG(DEBUG1, + "Get local block info from cache for block %s, cache size %zu", + block.toString().c_str(), BlockLocalPathInfoCache.size()); + } + } catch (const HdfsIOException& e) { + throw; + } catch (const HdfsException& e) { + NESTED_THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder: Failed to get block local " + "path information."); + } + + return retval; +} + +void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo( + const ExtendedBlock& block) { + BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey( + dnInfo.getXferPort(), block.getBlockId(), block.getPoolId())); +} + +shared_ptr<ReadShortCircuitInfo> +ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) { + shared_ptr<FileWrapper> dataFile; + shared_ptr<FileWrapper> metaFile; + + std::string metaFilePath = info.getLocalMetaPath(); + std::string dataFilePath = info.getLocalBlockPath(); + + if (conf.doUseMappedFile()) { + metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); + dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); + } else { + metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); + dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); + } + + if (!metaFile->open(metaFilePath)) { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s", + metaFilePath.c_str(), GetSystemErrorInfo(errno)); + } + + if (!dataFile->open(dataFilePath)) { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s", + dataFilePath.c_str(), GetSystemErrorInfo(errno)); + } + + dataFile->seek(0); + metaFile->seek(0); + + shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true)); + retval->setDataFile(dataFile); + retval->setMetaFile(metaFile); + return retval; +} + +std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress( + uint32_t port) { + std::string domainSocketPath = conf.getDomainSocketPath(); + + if (domainSocketPath.empty()) { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not " + "set"); + } + + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << port; + StringReplaceAll(domainSocketPath, "_PORT", ss.str()); + + return domainSocketPath; +} + +shared_ptr<ReadShortCircuitInfo> +ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, const ExtendedBlock& block, + const Token& token) { + std::string addr = buildDomainSocketAddress(key.dnPort); + DomainSocketImpl sock; + sock.connect(addr.c_str(), 0, conf.getInputConnTimeout()); + DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr); + sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion); + shared_ptr<ReadShortCircuitFDHolder> fds = + receiveReadShortCircuitFDs(sock, block); + return createReadShortCircuitInfo(key, fds); +} + +shared_ptr<ReadShortCircuitFDHolder> +ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs( + Socket& sock, const ExtendedBlock& block) { + std::vector<char> respBuffer; + int readTimeout = conf.getInputReadTimeout(); + shared_ptr<BufferedSocketReader> in( + new BufferedSocketReaderImpl(sock, 0)); // disable buffer + int32_t respSize = in->readVarint32(readTimeout); + + if (respSize <= 0 || respSize > 10 * 1024 * 1024) { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder get a invalid response size: %d, " + "Block: %s, " + "from Datanode: %s", + respSize, block.toString().c_str(), dnInfo.formatAddress().c_str()); + } + + respBuffer.resize(respSize); + in->readFully(&respBuffer[0], respSize, readTimeout); + BlockOpResponseProto resp; + + if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto " + "from " + "Datanode response, " + "Block: %s, from Datanode: %s", + block.toString().c_str(), dnInfo.formatAddress().c_str()); + } + + if (resp.status() != Status::DT_PROTO_SUCCESS) { + std::string msg; + + if (resp.has_message()) { + msg = resp.message(); + } + + if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { + THROW(HdfsInvalidBlockToken, + "ReadShortCircuitInfoBuilder: block's token is invalid. " + "Datanode: %s, Block: %s", + dnInfo.formatAddress().c_str(), block.toString().c_str()); + } else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) { + THROW(HdfsIOException, + "short-circuit read access is disabled for " + "DataNode %s. reason: %s", + dnInfo.formatAddress().c_str(), + (msg.empty() ? "check Datanode's log for more information" + : msg.c_str())); + } else { + THROW(HdfsIOException, + "ReadShortCircuitInfoBuilder: Datanode return an error when " + "sending read request to Datanode: %s, Block: %s, %s.", + dnInfo.formatAddress().c_str(), block.toString().c_str(), + (msg.empty() ? "check Datanode's log for more information" + : msg.c_str())); + } + } + + DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock); + + if (NULL == domainSocket) { + THROW(HdfsIOException, "Read short-circuit only works with Domain Socket"); + } + + shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder); + + std::vector<int> tempFds(2, -1); + respBuffer.resize(1); + domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(), + &respBuffer[0], respBuffer.size()); + + assert(tempFds[0] != -1 && "failed to receive data file descriptor"); + assert(tempFds[1] != -1 && "failed to receive metadata file descriptor"); + + fds->datafd = tempFds[0]; + fds->metafd = tempFds[1]; + + return fds; +} + +shared_ptr<ReadShortCircuitInfo> +ReadShortCircuitInfoBuilder::createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, + const shared_ptr<ReadShortCircuitFDHolder>& fds) { + shared_ptr<FileWrapper> dataFile; + shared_ptr<FileWrapper> metaFile; + + if (conf.doUseMappedFile()) { + metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); + dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper); + } else { + metaFile = shared_ptr<CFileWrapper>(new CFileWrapper); + dataFile = shared_ptr<CFileWrapper>(new CFileWrapper); + } + + metaFile->open(fds->metafd, false); + dataFile->open(fds->datafd, false); + + dataFile->seek(0); + metaFile->seek(0); + + shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, false)); + + retval->setFdHolder(fds); + retval->setDataFile(dataFile); + retval->setMetaFile(metaFile); + + return retval; +} +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/ReadShortCircuitInfo.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/ReadShortCircuitInfo.h b/depends/libhdfs3/src/client/ReadShortCircuitInfo.h new file mode 100644 index 0000000..dfb98d8 --- /dev/null +++ b/depends/libhdfs3/src/client/ReadShortCircuitInfo.h @@ -0,0 +1,193 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ +#define _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ + +#include "FileWrapper.h" +#include "Hash.h" +#include "LruMap.h" +#include "Memory.h" +#include "network/Socket.h" +#include "rpc/RpcAuth.h" +#include "server/BlockLocalPathInfo.h" +#include "server/DatanodeInfo.h" +#include "server/ExtendedBlock.h" +#include "SessionConfig.h" +#include "Thread.h" +#include "Token.h" + +namespace Hdfs { +namespace Internal { + +struct ReadShortCircuitInfoKey { + ReadShortCircuitInfoKey(uint32_t dnPort, int64_t blockId, + const std::string& bpid) + : dnPort(dnPort), blockId(blockId), bpid(bpid) {} + + size_t hash_value() const { + size_t values[] = {Int32Hasher(dnPort), Int64Hasher(blockId), + StringHasher(bpid)}; + return CombineHasher(values, sizeof(values) / sizeof(values[0])); + } + + bool operator==(const ReadShortCircuitInfoKey& other) const { + return dnPort == other.dnPort && blockId == other.blockId && + bpid == other.bpid; + } + + uint32_t dnPort; + int64_t blockId; + std::string bpid; +}; + +struct ReadShortCircuitFDHolder { + public: + ReadShortCircuitFDHolder() : metafd(-1), datafd(-1) {} + ~ReadShortCircuitFDHolder(); + + int metafd; + int datafd; +}; + +class ReadShortCircuitInfo { + public: + ReadShortCircuitInfo(const ReadShortCircuitInfoKey& key, bool legacy) + : legacy(legacy), + valid(true), + blockId(key.blockId), + bpid(key.bpid), + dnPort(key.dnPort) {} + + ~ReadShortCircuitInfo(); + + const shared_ptr<FileWrapper>& getDataFile() const { return dataFile; } + + void setDataFile(shared_ptr<FileWrapper> dataFile) { + this->dataFile = dataFile; + } + + const shared_ptr<FileWrapper>& getMetaFile() const { return metaFile; } + + void setMetaFile(shared_ptr<FileWrapper> metaFile) { + this->metaFile = metaFile; + } + + bool isValid() const { return valid; } + + void setValid(bool valid) { this->valid = valid; } + + int64_t getBlockId() const { return blockId; } + + void setBlockId(int64_t blockId) { this->blockId = blockId; } + + const std::string& getBpid() const { return bpid; } + + void setBpid(const std::string& bpid) { this->bpid = bpid; } + + uint32_t getDnPort() const { return dnPort; } + + void setDnPort(uint32_t dnPort) { this->dnPort = dnPort; } + + ReadShortCircuitInfoKey getKey() const { + return ReadShortCircuitInfoKey(dnPort, blockId, bpid); + } + + bool isLegacy() const { return legacy; } + + void setLegacy(bool legacy) { this->legacy = legacy; } + + const shared_ptr<ReadShortCircuitFDHolder>& getFdHolder() const { + return fdHolder; + } + + void setFdHolder(const shared_ptr<ReadShortCircuitFDHolder>& fdHolder) { + this->fdHolder = fdHolder; + } + + const std::string formatBlockInfo() const { + ExtendedBlock block; + block.setBlockId(blockId); + block.setPoolId(bpid); + return block.toString(); + } + + private: + bool legacy; + bool valid; + shared_ptr<FileWrapper> dataFile; + shared_ptr<FileWrapper> metaFile; + shared_ptr<ReadShortCircuitFDHolder> fdHolder; + int64_t blockId; + std::string bpid; + uint32_t dnPort; +}; + +typedef LruMap<ReadShortCircuitInfoKey, shared_ptr<ReadShortCircuitFDHolder> > + ReadShortCircuitFDCacheType; +typedef LruMap<ReadShortCircuitInfoKey, BlockLocalPathInfo> + BlockLocalPathInfoCacheType; + +class ReadShortCircuitInfoBuilder { + public: + ReadShortCircuitInfoBuilder(const DatanodeInfo& dnInfo, const RpcAuth& auth, + const SessionConfig& conf); + shared_ptr<ReadShortCircuitInfo> fetchOrCreate(const ExtendedBlock& block, + const Token token); + static void release(const ReadShortCircuitInfo& info); + + private: + BlockLocalPathInfo getBlockLocalPathInfo(const ExtendedBlock& block, + const Token& token); + void invalidBlockLocalPathInfo(const ExtendedBlock& block); + shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info); + shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, const ExtendedBlock& block, + const Token& token); + shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( + const ReadShortCircuitInfoKey& key, + const shared_ptr<ReadShortCircuitFDHolder>& fds); + std::string buildDomainSocketAddress(uint32_t port); + shared_ptr<Socket> getDomainSocketConnection(const std::string& addr); + shared_ptr<ReadShortCircuitFDHolder> receiveReadShortCircuitFDs( + Socket& sock, const ExtendedBlock& block); + + private: + DatanodeInfo dnInfo; + RpcAuth auth; + SessionConfig conf; + static const int MaxReadShortCircuitVersion = 1; + static ReadShortCircuitFDCacheType ReadShortCircuitFDCache; + static BlockLocalPathInfoCacheType BlockLocalPathInfoCache; +}; +} +} + +HDFS_HASH_DEFINE(::Hdfs::Internal::ReadShortCircuitInfoKey); + +#endif /* _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/RemoteBlockReader.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/RemoteBlockReader.cpp b/depends/libhdfs3/src/client/RemoteBlockReader.cpp new file mode 100644 index 0000000..399fb91 --- /dev/null +++ b/depends/libhdfs3/src/client/RemoteBlockReader.cpp @@ -0,0 +1,373 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "BigEndian.h" +#include "DataTransferProtocolSender.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "HWCrc32c.h" +#include "RemoteBlockReader.h" +#include "SWCrc32c.h" +#include "WriteBuffer.h" + +#include <inttypes.h> +#include <vector> + +namespace Hdfs { +namespace Internal { + +RemoteBlockReader::RemoteBlockReader(const ExtendedBlock& eb, + DatanodeInfo& datanode, + PeerCache& peerCache, int64_t start, + int64_t len, const Token& token, + const char* clientName, bool verify, + SessionConfig& conf) + : sentStatus(false), + verify(verify), + binfo(eb), + datanode(datanode), + checksumSize(0), + chunkSize(0), + position(0), + size(0), + cursor(start), + endOffset(len + start), + lastSeqNo(-1), + peerCache(peerCache) { + + assert(start >= 0); + readTimeout = conf.getInputReadTimeout(); + writeTimeout = conf.getInputWriteTimeout(); + connTimeout = conf.getInputConnTimeout(); + sock = getNextPeer(datanode); + in = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock)); + sender = shared_ptr<DataTransferProtocol>(new DataTransferProtocolSender( + *sock, writeTimeout, datanode.formatAddress())); + sender->readBlock(eb, token, clientName, start, len); + checkResponse(); +} + +RemoteBlockReader::~RemoteBlockReader() { + if (sentStatus) { + peerCache.addConnection(sock, datanode); + } else { + sock->close(); + } +} + +shared_ptr<Socket> RemoteBlockReader::getNextPeer(const DatanodeInfo& dn) { + shared_ptr<Socket> sock; + try { + sock = peerCache.getConnection(dn); + + if (!sock) { + sock = shared_ptr<Socket>(new TcpSocketImpl); + sock->connect(dn.getIpAddr().c_str(), dn.getXferPort(), + connTimeout); + sock->setNoDelay(true); + } + } catch (const HdfsTimeoutException & e) { + NESTED_THROW(HdfsIOException, + "RemoteBlockReader: Failed to connect to %s", + dn.formatAddress().c_str()); + } + + return sock; +} + +void RemoteBlockReader::checkResponse() { + std::vector<char> respBuffer; + int32_t respSize = in->readVarint32(readTimeout); + + if (respSize <= 0 || respSize > 10 * 1024 * 1024) { + THROW(HdfsIOException, "RemoteBlockReader get a invalid response size: %d, Block: %s, from Datanode: %s", + respSize, binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + + respBuffer.resize(respSize); + in->readFully(&respBuffer[0], respSize, readTimeout); + BlockOpResponseProto resp; + + if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) { + THROW(HdfsIOException, "RemoteBlockReader cannot parse BlockOpResponseProto from Datanode response, " + "Block: %s, from Datanode: %s", binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + + if (resp.status() != Status::DT_PROTO_SUCCESS) { + std::string msg; + + if (resp.has_message()) { + msg = resp.message(); + } + + if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) { + THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token is invalid. Datanode: %s, Block: %s", + datanode.formatAddress().c_str(), binfo.toString().c_str()); + } else { + THROW(HdfsIOException, + "RemoteBlockReader: Datanode return an error when sending read request to Datanode: %s, Block: %s, %s.", + datanode.formatAddress().c_str(), binfo.toString().c_str(), + (msg.empty() ? "check Datanode's log for more information" : msg.c_str())); + } + } + + const ReadOpChecksumInfoProto & checksumInfo = resp.readopchecksuminfo(); + const ChecksumProto & cs = checksumInfo.checksum(); + chunkSize = cs.bytesperchecksum(); + + if (chunkSize < 0) { + THROW(HdfsIOException, + "RemoteBlockReader invalid chunk size: %d, expected range[0, %" PRId64 "], Block: %s, from Datanode: %s", + chunkSize, binfo.getNumBytes(), binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + + switch (cs.type()) { + case ChecksumTypeProto::CHECKSUM_NULL: + verify = false; + checksumSize = 0; + break; + + case ChecksumTypeProto::CHECKSUM_CRC32: + THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 checksum, Block: %s, from Datanode: %s", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + break; + + case ChecksumTypeProto::CHECKSUM_CRC32C: + if (HWCrc32c::available()) { + checksum = shared_ptr<Checksum>(new HWCrc32c()); + } else { + checksum = shared_ptr<Checksum>(new SWCrc32c()); + } + + checksumSize = sizeof(int32_t); + break; + + default: + THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum type: %d, Block: %s, from Datanode: %s", + static_cast<int>(cs.type()), binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + + /* + * The offset into the block at which the first packet + * will start. This is necessary since reads will align + * backwards to a checksum chunk boundary. + */ + int64_t firstChunkOffset = checksumInfo.chunkoffset(); + + if (firstChunkOffset < 0 || firstChunkOffset > cursor || firstChunkOffset <= cursor - chunkSize) { + THROW(HdfsIOException, + "RemoteBlockReader invalid first chunk offset: %" PRId64 ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s", + firstChunkOffset, cursor, binfo.toString().c_str(), datanode.formatAddress().c_str()); + } +} + +shared_ptr<PacketHeader> RemoteBlockReader::readPacketHeader() { + try { + shared_ptr<PacketHeader> retval; + static const int packetHeaderLen = PacketHeader::GetPkgHeaderSize(); + std::vector<char> buf(packetHeaderLen); + + if (lastHeader && lastHeader->isLastPacketInBlock()) { + THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s.", + datanode.formatAddress().c_str(), binfo.toString().c_str()); + } + + in->readFully(&buf[0], packetHeaderLen, readTimeout); + retval = shared_ptr<PacketHeader>(new PacketHeader); + retval->readFields(&buf[0], packetHeaderLen); + return retval; + } catch (const HdfsIOException & e) { + NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read block header for Block: %s from Datanode: %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } +} + +void RemoteBlockReader::readNextPacket() { + assert(position >= size); + lastHeader = readPacketHeader(); + int dataSize = lastHeader->getDataLen(); + int64_t pendingAhead = 0; + + if (!lastHeader->sanityCheck(lastSeqNo)) { + THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity check for block %s from Datanode %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + + assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t)); + + if (dataSize > 0) { + int chunks = (dataSize + chunkSize - 1) / chunkSize; + int checksumLen = chunks * checksumSize; + size = checksumLen + dataSize; + assert(size == lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t))); + buffer.resize(size); + in->readFully(&buffer[0], size, readTimeout); + lastSeqNo = lastHeader->getSeqno(); + + if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) + dataSize + checksumLen) { + THROW(HdfsIOException, "Invalid Packet, packetLen is %d, dataSize is %d, checksum size is %d", + lastHeader->getPacketLen(), dataSize, checksumLen); + } + + if (verify) { + verifyChecksum(chunks); + } + + /* + * skip checksum + */ + position = checksumLen; + /* + * the first packet we get may start at the position before we required + */ + pendingAhead = cursor - lastHeader->getOffsetInBlock(); + pendingAhead = pendingAhead > 0 ? pendingAhead : 0; + position += pendingAhead; + } + + /* + * we reach the end of the range we required, send status to datanode + * if datanode do not sending data anymore. + */ + + if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) { + sendStatus(); + } +} + +bool RemoteBlockReader::readTrailingEmptyPacket() { + shared_ptr<PacketHeader> trailingHeader = readPacketHeader(); + + if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) { + return false; + } + + return true; +} + +void RemoteBlockReader::sendStatus() { + ClientReadStatusProto status; + + if (verify) { + status.set_status(Status::DT_PROTO_CHECKSUM_OK); + } else { + status.set_status(Status::DT_PROTO_SUCCESS); + } + + WriteBuffer buffer; + int size = status.ByteSize(); + buffer.writeVarint32(size); + status.SerializeToArray(buffer.alloc(size), size); + sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout); + sentStatus = true; +} + +void RemoteBlockReader::verifyChecksum(int chunks) { + int dataSize = lastHeader->getDataLen(); + char * pchecksum = &buffer[0]; + char * pdata = &buffer[0] + (chunks * checksumSize); + + for (int i = 0; i < chunks; ++i) { + int size = chunkSize < dataSize ? chunkSize : dataSize; + dataSize -= size; + checksum->reset(); + checksum->update(pdata + (i * chunkSize), size); + uint32_t result = checksum->getValue(); + uint32_t target = ReadBigEndian32FromArray(pchecksum + (i * checksumSize)); + + if (result != target && size == chunkSize) { + THROW(ChecksumException, "RemoteBlockReader: checksum not match for Block: %s, on Datanode: %s", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } + } + + assert(0 == dataSize); +} + +int64_t RemoteBlockReader::available() { + return size - position > 0 ? size - position : 0; +} + +int32_t RemoteBlockReader::read(char * buf, int32_t len) { + assert(0 != len && NULL != buf); + + if (cursor >= endOffset) { + THROW(HdfsIOException, "RemoteBlockReader: read over block end from Datanode: %s, Block: %s.", + datanode.formatAddress().c_str(), binfo.toString().c_str()); + } + + try { + if (position >= size) { + readNextPacket(); + } + + int32_t todo = len < size - position ? len : size - position; + memcpy(buf, &buffer[position], todo); + position += todo; + cursor += todo; + return todo; + } catch (const HdfsTimeoutException & e) { + NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } catch (const HdfsNetworkException & e) { + NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } +} + +void RemoteBlockReader::skip(int64_t len) { + int64_t todo = len; + assert(cursor + len <= endOffset); + + try { + while (todo > 0) { + if (cursor >= endOffset) { + THROW(HdfsIOException, "RemoteBlockReader: skip over block end from Datanode: %s, Block: %s.", + datanode.formatAddress().c_str(), binfo.toString().c_str()); + } + + if (position >= size) { + readNextPacket(); + } + + int batch = size - position; + batch = batch < todo ? batch : static_cast<int>(todo); + position += batch; + cursor += batch; + todo -= batch; + } + } catch (const HdfsTimeoutException & e) { + NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } catch (const HdfsNetworkException & e) { + NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read Block: %s from Datanode: %s.", + binfo.toString().c_str(), datanode.formatAddress().c_str()); + } +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/RemoteBlockReader.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/RemoteBlockReader.h b/depends/libhdfs3/src/client/RemoteBlockReader.h new file mode 100644 index 0000000..1b81e7a --- /dev/null +++ b/depends/libhdfs3/src/client/RemoteBlockReader.h @@ -0,0 +1,111 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ +#define _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ + +#include "BlockReader.h" +#include "Checksum.h" +#include "DataTransferProtocol.h" +#include "Memory.h" +#include "network/BufferedSocketReader.h" +#include "network/TcpSocket.h" +#include "PacketHeader.h" +#include "PeerCache.h" +#include "server/DatanodeInfo.h" +#include "server/LocatedBlocks.h" +#include "SessionConfig.h" + +namespace Hdfs { +namespace Internal { + +class RemoteBlockReader: public BlockReader { +public: + RemoteBlockReader(const ExtendedBlock& eb, DatanodeInfo& datanode, + PeerCache& peerCache, int64_t start, int64_t len, + const Token& token, const char* clientName, bool verify, + SessionConfig& conf); + + ~RemoteBlockReader(); + + /** + * Get how many bytes can be read without blocking. + * @return The number of bytes can be read without blocking. + */ + virtual int64_t available(); + + /** + * To read data from block. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + * @return return the number of bytes filled in the buffer, + * it may less than size. Return 0 if reach the end of block. + */ + virtual int32_t read(char * buf, int32_t len); + + /** + * Move the cursor forward len bytes. + * @param len The number of bytes to skip. + */ + virtual void skip(int64_t len); + +private: + bool readTrailingEmptyPacket(); + shared_ptr<PacketHeader> readPacketHeader(); + shared_ptr<Socket> getNextPeer(const DatanodeInfo& dn); + void checkResponse(); + void readNextPacket(); + void sendStatus(); + void verifyChecksum(int chunks); + +private: + bool sentStatus; + bool verify; //verify checksum or not. + const ExtendedBlock & binfo; + DatanodeInfo & datanode; + int checksumSize; + int chunkSize; + int connTimeout; + int position; //point in buffer. + int readTimeout; + int size; //data size in buffer. + int writeTimeout; + int64_t cursor; //point in block. + int64_t endOffset; //offset in block requested to read to. + int64_t lastSeqNo; //segno of the last chunk received + PeerCache& peerCache; + shared_ptr<BufferedSocketReader> in; + shared_ptr<Checksum> checksum; + shared_ptr<DataTransferProtocol> sender; + shared_ptr<PacketHeader> lastHeader; + shared_ptr<Socket> sock; + std::vector<char> buffer; +}; + +} +} +#endif /* _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Token.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Token.cpp b/depends/libhdfs3/src/client/Token.cpp new file mode 100644 index 0000000..a0cb8c0 --- /dev/null +++ b/depends/libhdfs3/src/client/Token.cpp @@ -0,0 +1,180 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Exception.h" +#include "ExceptionInternal.h" +#include "Hash.h" +#include "Token.h" +#include "WritableUtils.h" + +#include <gsasl.h> + +using namespace Hdfs::Internal; + +namespace Hdfs { +namespace Internal { + +static std::string Base64Encode(const char * input, size_t len) { + int rc = 0; + size_t outLen; + char * output = NULL; + std::string retval; + + if (GSASL_OK != (rc = gsasl_base64_to(input, len, &output, &outLen))) { + assert(GSASL_MALLOC_ERROR == rc); + throw std::bad_alloc(); + } + + assert(NULL != output); + retval = output; + gsasl_free(output); + + for (size_t i = 0 ; i < retval.length(); ++i) { + switch (retval[i]) { + case '+': + retval[i] = '-'; + break; + + case '/': + retval[i] = '_'; + break; + + case '=': + retval.resize(i); + break; + + default: + break; + } + } + + return retval; +} + +static void Base64Decode(const std::string & urlSafe, + std::vector<char> & buffer) { + int retval = 0, append = 0; + size_t outLen; + char * output = NULL; + std::string input = urlSafe; + + for (size_t i = 0; i < input.length(); ++i) { + switch (input[i]) { + case '-': + input[i] = '+'; + break; + + case '_': + input[i] = '/'; + break; + + default: + break; + } + } + + while (true) { + retval = gsasl_base64_from(&input[0], input.length(), &output, &outLen); + + if (GSASL_OK != retval) { + switch (retval) { + case GSASL_BASE64_ERROR: + if (append++ < 2) { + input.append("="); + continue; + } + + throw std::invalid_argument( + "invalid input of gsasl_base64_from"); + + case GSASL_MALLOC_ERROR: + throw std::bad_alloc(); + + default: + assert( + false + && "unexpected return value from gsasl_base64_from"); + } + } + + break; + } + + assert(outLen >= 0); + buffer.resize(outLen); + memcpy(&buffer[0], output, outLen); + gsasl_free(output); +} + +std::string Token::toString() const { + try { + size_t len = 0; + std::vector<char> buffer(1024); + WritableUtils out(&buffer[0], buffer.size()); + len += out.WriteInt32(identifier.size()); + len += out.WriteRaw(&identifier[0], identifier.size()); + len += out.WriteInt32(password.size()); + len += out.WriteRaw(&password[0], password.size()); + len += out.WriteText(kind); + len += out.WriteText(service); + return Base64Encode(&buffer[0], len); + } catch (...) { + NESTED_THROW(HdfsIOException, "cannot convert token to string"); + } +} + +Token & Token::fromString(const std::string & str) { + int32_t len; + + try { + std::vector<char> buffer; + Base64Decode(str, buffer); + WritableUtils in(&buffer[0], buffer.size()); + len = in.ReadInt32(); + identifier.resize(len); + in.ReadRaw(&identifier[0], len); + len = in.ReadInt32(); + password.resize(len); + in.ReadRaw(&password[0], len); + kind = in.ReadText(); + service = in.ReadText(); + return *this; + } catch (...) { + NESTED_THROW(HdfsInvalidBlockToken, + "cannot construct a token from the string"); + } +} + +size_t Token::hash_value() const { + size_t values[] = { StringHasher(identifier), StringHasher(password), + StringHasher(kind), StringHasher(service) + }; + return CombineHasher(values, sizeof(values) / sizeof(values[0])); +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Token.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Token.h b/depends/libhdfs3/src/client/Token.h new file mode 100644 index 0000000..f3301d9 --- /dev/null +++ b/depends/libhdfs3/src/client/Token.h @@ -0,0 +1,91 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_TOKEN_H_ +#define _HDFS_LIBHDFS3_CLIENT_TOKEN_H_ + +#include <string> + +namespace Hdfs { +namespace Internal { + +class Token { +public: + const std::string & getIdentifier() const { + return identifier; + } + + void setIdentifier(const std::string & identifier) { + this->identifier = identifier; + } + + const std::string & getKind() const { + return kind; + } + + void setKind(const std::string & kind) { + this->kind = kind; + } + + const std::string & getPassword() const { + return password; + } + + void setPassword(const std::string & password) { + this->password = password; + } + + const std::string & getService() const { + return service; + } + + void setService(const std::string & service) { + this->service = service; + } + + bool operator ==(const Token & other) const { + return identifier == other.identifier && password == other.password + && kind == other.kind && service == other.service; + } + + std::string toString() const; + + Token & fromString(const std::string & str); + + size_t hash_value() const; + +private: + std::string identifier; + std::string password; + std::string kind; + std::string service; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_TOKEN_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/TokenInternal.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/TokenInternal.h b/depends/libhdfs3/src/client/TokenInternal.h new file mode 100644 index 0000000..141de25 --- /dev/null +++ b/depends/libhdfs3/src/client/TokenInternal.h @@ -0,0 +1,36 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ +#define _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ + +#include "Hash.h" +#include "Token.h" + +HDFS_HASH_DEFINE(::Hdfs::Internal::Token); + +#endif /* _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ */