http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Pipeline.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Pipeline.cpp 
b/depends/libhdfs3/src/client/Pipeline.cpp
new file mode 100644
index 0000000..3396e31
--- /dev/null
+++ b/depends/libhdfs3/src/client/Pipeline.cpp
@@ -0,0 +1,792 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DateTime.h"
+#include "Pipeline.h"
+#include "Logger.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "OutputStreamInter.h"
+#include "FileSystemInter.h"
+#include "DataTransferProtocolSender.h"
+#include "datatransfer.pb.h"
+
+#include <inttypes.h>
+
+namespace Hdfs {
+namespace Internal {
+
+PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig 
& conf,
+                           shared_ptr<FileSystemInter> filesystem, int 
checksumType, int chunkSize,
+                           int replication, int64_t bytesSent, PacketPool & 
packetPool, shared_ptr<LocatedBlock> lastBlock) :
+    checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), 
replication(replication), bytesAcked(
+        bytesSent), bytesSent(bytesSent), packetPool(packetPool), 
filesystem(filesystem), lastBlock(lastBlock), path(
+            path) {
+    canAddDatanode = conf.canAddDatanode();
+    blockWriteRetry = conf.getBlockWriteRetry();
+    connectTimeout = conf.getOutputConnTimeout();
+    readTimeout = conf.getOutputReadTimeout();
+    writeTimeout = conf.getOutputWriteTimeout();
+    clientName = filesystem->getClientName();
+
+    if (append) {
+        LOG(DEBUG2, "create pipeline for file %s to append to %s at position 
%" PRId64,
+            path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
+        stage = PIPELINE_SETUP_APPEND;
+        assert(lastBlock);
+        nodes = lastBlock->getLocations();
+        storageIDs = lastBlock->getStorageIDs();
+        buildForAppendOrRecovery(false);
+        stage = DATA_STREAMING;
+    } else {
+        LOG(DEBUG2, "create pipeline for file %s to write to a new block", 
path);
+        stage = PIPELINE_SETUP_CREATE;
+        buildForNewBlock();
+        stage = DATA_STREAMING;
+    }
+}
+
+int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) {
+    if (nodes.size() != original.size() + 1) {
+        THROW(HdfsIOException, "Failed to acquire a datanode for block %s from 
namenode.",
+              lastBlock->toString().c_str());
+    }
+
+    for (size_t i = 0; i < nodes.size(); i++) {
+        size_t j = 0;
+
+        for (; j < original.size() && !(nodes[i] == original[j]); j++)
+            ;
+
+        if (j == original.size()) {
+            return i;
+        }
+    }
+
+    THROW(HdfsIOException, "Cannot add new datanode for block %s.", 
lastBlock->toString().c_str());
+}
+
+void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & 
src,
+                            const std::vector<DatanodeInfo> & targets, const 
Token & token) {
+    shared_ptr<Socket> so(new TcpSocketImpl);
+    shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
+    so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
+    DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
+    sender.transferBlock(blk, token, clientName.c_str(), targets);
+    int size;
+    size = in->readVarint32(readTimeout);
+    std::vector<char> buf(size);
+    in->readFully(&buf[0], size, readTimeout);
+    BlockOpResponseProto resp;
+
+    if (!resp.ParseFromArray(&buf[0], size)) {
+        THROW(HdfsIOException, "cannot parse datanode response from %s fro 
block %s.",
+              src.formatAddress().c_str(), lastBlock->toString().c_str());
+    }
+
+    if (Status::DT_PROTO_SUCCESS != resp.status()) {
+        THROW(HdfsIOException, "Failed to transfer block to a new datanode %s 
for block %s.",
+              targets[0].formatAddress().c_str(),
+              lastBlock->toString().c_str());
+    }
+}
+
+bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & 
excludedNodes) {
+    try {
+        /*
+         * get a new datanode
+         */
+        std::vector<DatanodeInfo> original = nodes;
+        shared_ptr<LocatedBlock> lb;
+        lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, 
storageIDs,
+                                               excludedNodes, 1);
+        nodes = lb->getLocations();
+        storageIDs = lb->getStorageIDs();
+
+        /*
+         * failed to add new datanode into pipeline.
+         */
+        if (original.size() == nodes.size()) {
+            LOG(LOG_ERROR,
+                "Failed to add new datanode into pipeline for block: %s file 
%s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } else {
+            /*
+             * find the new datanode
+             */
+            int d = findNewDatanode(original);
+            /*
+             * in case transfer block fail.
+             */
+            errorIndex = d;
+            /*
+             * transfer replica
+             */
+            DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1];
+            std::vector<DatanodeInfo> targets;
+            targets.push_back(nodes[d]);
+            LOG(INFO, "Replicate block %s from %s to %s for file %s.", 
lastBlock->toString().c_str(),
+                src.formatAddress().c_str(), 
targets[0].formatAddress().c_str(), path.c_str());
+            transfer(*lastBlock, src, targets, lb->getToken());
+            errorIndex = -1;
+            return true;
+        }
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsFileSystemClosed & e) {
+        throw;
+    } catch (const SafeModeException & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        std::string buffer;
+        LOG(LOG_ERROR,
+            "Failed to add a new datanode into pipeline for block: %s file 
%s.\n%s",
+            lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, 
buffer));
+    }
+
+    return false;
+}
+
+void PipelineImpl::checkPipelineWithReplicas() {
+    if (static_cast<int>(nodes.size()) < replication) {
+        std::stringstream ss;
+        ss.imbue(std::locale::classic());
+        int size = nodes.size();
+
+        for (int i = 0; i < size - 1; ++i) {
+            ss << nodes[i].formatAddress() << ", ";
+        }
+
+        if (nodes.empty()) {
+            ss << "Empty";
+        } else {
+            ss << nodes.back().formatAddress();
+        }
+
+        LOG(WARNING,
+            "the number of nodes in pipeline is %d [%s], is less than the 
expected number of replica %d for block %s file %s",
+            static_cast<int>(nodes.size()), ss.str().c_str(), replication,
+            lastBlock->toString().c_str(), path.c_str());
+    }
+}
+
+void PipelineImpl::buildForAppendOrRecovery(bool recovery) {
+    int64_t gs = 0;
+    int retry = blockWriteRetry;
+    exception_ptr lastException;
+    std::vector<DatanodeInfo> excludedNodes;
+    shared_ptr<LocatedBlock> lb;
+    std::string buffer;
+
+    do {
+        /*
+         * Remove bad datanode from list of datanodes.
+         * If errorIndex was not set (i.e. appends), then do not remove
+         * any datanodes
+         */
+        if (errorIndex >= 0) {
+            assert(lastBlock);
+            LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from 
pipeline when %s block %s for file %s, stage = %s.",
+                nodes[errorIndex].formatAddress().c_str(),
+                (recovery ? "recovery" : "append to"), 
lastBlock->toString().c_str(),
+                path.c_str(), StageToString(stage));
+            excludedNodes.push_back(nodes[errorIndex]);
+            nodes.erase(nodes.begin() + errorIndex);
+
+            if (!storageIDs.empty()) {
+                storageIDs.erase(storageIDs.begin() + errorIndex);
+            }
+
+            if (nodes.empty()) {
+                THROW(HdfsIOException,
+                      "Build pipeline to %s block %s failed: all datanodes are 
bad.",
+                      (recovery ? "recovery" : "append to"), 
lastBlock->toString().c_str());
+            }
+
+            errorIndex = -1;
+        }
+
+        try {
+            gs = 0;
+
+            /*
+             * Check if the number of datanodes in pipeline satisfy the 
replication requirement,
+             * add new datanode if not
+             */
+            if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE
+                    && static_cast<int>(nodes.size()) < replication && 
canAddDatanode) {
+                if (!addDatanodeToPipeline(excludedNodes)) {
+                    THROW(HdfsIOException,
+                          "Failed to add new datanode into pipeline for block: 
%s file %s, "
+                          "set \"output.replace-datanode-on-failure\" to 
\"false\" to disable this feature.",
+                          lastBlock->toString().c_str(), path.c_str());
+                }
+            }
+
+            if (errorIndex >= 0) {
+                continue;
+            }
+
+            checkPipelineWithReplicas();
+            /*
+             * Update generation stamp and access token
+             */
+            lb = filesystem->updateBlockForPipeline(*lastBlock);
+            gs = lb->getGenerationStamp();
+            /*
+             * Try to build pipeline
+             */
+            createBlockOutputStream(lb->getToken(), gs, recovery);
+            /*
+             * everything is ok, reset errorIndex.
+             */
+            errorIndex = -1;
+            lastException = exception_ptr();
+            break; //break on success
+        } catch (const HdfsInvalidBlockToken & e) {
+            lastException = current_exception();
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, new 
generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs, 
GetExceptionDetail(e, buffer));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } catch (const HdfsTimeoutException & e) {
+            lastException = current_exception();
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, new 
generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs, 
GetExceptionDetail(e, buffer));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.",
+                lastBlock->toString().c_str(), path.c_str());
+        } catch (const HdfsIOException & e) {
+            lastException = current_exception();
+            /*
+             * Set recovery flag to true in case of failed to create a 
pipeline for appending.
+             */
+            recovery = true;
+            LOG(LOG_ERROR,
+                "Pipeline: Failed to build pipeline for block %s file %s, new 
generation stamp is %" PRId64 ",\n%s",
+                lastBlock->toString().c_str(), path.c_str(), gs, 
GetExceptionDetail(e, buffer));
+            LOG(INFO, "Try to recovery pipeline for block %s file %s.", 
lastBlock->toString().c_str(), path.c_str());
+        }
+
+        /*
+         * we don't known what happened, no datanode is reported failure, 
reduce retry count in case infinite loop.
+         * it may caused by rpc call throw HdfsIOException
+         */
+        if (errorIndex < 0) {
+            --retry;
+        }
+    } while (retry > 0);
+
+    if (lastException) {
+        rethrow_exception(lastException);
+    }
+
+    /*
+     * Update pipeline at the namenode, non-idempotent RPC call.
+     */
+    lb->setPoolId(lastBlock->getPoolId());
+    lb->setBlockId(lastBlock->getBlockId());
+    lb->setLocations(nodes);
+    lb->setStorageIDs(storageIDs);
+    lb->setNumBytes(lastBlock->getNumBytes());
+    lb->setOffset(lastBlock->getOffset());
+    filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
+    lastBlock = lb;
+}
+
+void PipelineImpl::locateNextBlock(
+    const std::vector<DatanodeInfo> & excludedNodes) {
+    milliseconds sleeptime(100);
+    milliseconds fiveSeconds(5000);
+    int retry = blockWriteRetry;
+
+    while (true) {
+        try {
+            lastBlock = filesystem->addBlock(path, lastBlock.get(),
+                                             excludedNodes);
+            assert(lastBlock);
+            return;
+        } catch (const NotReplicatedYetException & e) {
+            LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock 
for block %s, "
+                "already retry %d times, max retry %d times", 
lastBlock->toString().c_str(),
+                blockWriteRetry - retry, blockWriteRetry);
+
+            if (retry--) {
+                try {
+                    sleep_for(sleeptime);
+                } catch (...) {
+                }
+
+                sleeptime *= 2;
+                sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds;
+            } else {
+                throw;
+            }
+        }
+    }
+}
+
+static std::string FormatExcludedNodes(
+    const std::vector<DatanodeInfo> & excludedNodes) {
+    std::stringstream ss;
+    ss.imbue(std::locale::classic());
+    ss << "[";
+    int size = excludedNodes.size();
+
+    for (int i = 0; i < size - 1; ++i) {
+        ss << excludedNodes[i].formatAddress() << ", ";
+    }
+
+    if (excludedNodes.empty()) {
+        ss << "Empty";
+    } else {
+        ss << excludedNodes.back().formatAddress();
+    }
+
+    ss << "]";
+    return ss.str();
+}
+
+void PipelineImpl::buildForNewBlock() {
+    int retryAllocNewBlock = 0, retry = blockWriteRetry;
+    LocatedBlock lb;
+    std::vector<DatanodeInfo> excludedNodes;
+    shared_ptr<LocatedBlock> block = lastBlock;
+    std::string buffer;
+
+    do {
+        errorIndex = -1;
+        lastBlock = block;
+
+        try {
+            locateNextBlock(excludedNodes);
+            lastBlock->setNumBytes(0);
+            nodes = lastBlock->getLocations();
+            storageIDs = lastBlock->getStorageIDs();
+        } catch (const HdfsRpcException & e) {
+            const char * lastBlockName = lastBlock ? 
lastBlock->toString().c_str() : "Null";
+            LOG(LOG_ERROR,
+                "Failed to allocate a new empty block for file %s, last block 
%s, excluded nodes %s.\n%s",
+                path.c_str(), lastBlockName, 
FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
+
+            if (retryAllocNewBlock > blockWriteRetry) {
+                throw;
+            }
+
+            LOG(INFO, "Retry to allocate a new empty block for file %s, last 
block %s, excluded nodes %s.",
+                path.c_str(), lastBlockName, 
FormatExcludedNodes(excludedNodes).c_str());
+            ++retryAllocNewBlock;
+            continue;
+        } catch (const HdfsException & e) {
+            const char * lastBlockName = lastBlock ? 
lastBlock->toString().c_str() : "Null";
+            LOG(LOG_ERROR,
+                "Failed to allocate a new empty block for file %s, last block 
%s, excluded nodes %s.\n%s",
+                path.c_str(), lastBlockName, 
FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
+            throw;
+        }
+
+        retryAllocNewBlock = 0;
+        checkPipelineWithReplicas();
+
+        if (nodes.empty()) {
+            THROW(HdfsIOException,
+                  "No datanode is available to create a pipeline for block %s 
file %s.",
+                  lastBlock->toString().c_str(), path.c_str());
+        }
+
+        try {
+            createBlockOutputStream(lastBlock->getToken(), 0, false);
+            break;  //break on success
+        } catch (const HdfsInvalidBlockToken & e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(), 
GetExceptionDetail(e, buffer));
+        } catch (const HdfsTimeoutException & e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(), 
GetExceptionDetail(e, buffer));
+        } catch (const HdfsIOException & e) {
+            LOG(LOG_ERROR,
+                "Failed to setup the pipeline for new block %s file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(), 
GetExceptionDetail(e, buffer));
+        }
+
+        LOG(INFO, "Abandoning block: %s for file %s.", 
lastBlock->toString().c_str(), path.c_str());
+
+        try {
+            filesystem->abandonBlock(*lastBlock, path);
+        } catch (const HdfsException & e) {
+            LOG(LOG_ERROR,
+                "Failed to abandon useless block %s for file %s.\n%s",
+                lastBlock->toString().c_str(), path.c_str(), 
GetExceptionDetail(e, buffer));
+            throw;
+        }
+
+        if (errorIndex >= 0) {
+            LOG(INFO, "Excluding invalid datanode: %s for block %s for file 
%s",
+                nodes[errorIndex].formatAddress().c_str(), 
lastBlock->toString().c_str(), path.c_str());
+            excludedNodes.push_back(nodes[errorIndex]);
+        } else {
+            /*
+             * we don't known what happened, no datanode is reported failure, 
reduce retry count in case of infinite loop.
+             */
+            --retry;
+        }
+    } while (retry);
+}
+
+/*
+ * bad link node must be either empty or a "IP:PORT"
+ */
+void PipelineImpl::checkBadLinkFormat(const std::string & n) {
+    std::string node = n;
+
+    if (node.empty()) {
+        return;
+    }
+
+    do {
+        const char * host = &node[0], *port;
+        size_t pos = node.find_last_of(":");
+
+        if (pos == node.npos || pos + 1 == node.length()) {
+            break;
+        }
+
+        node[pos] = 0;
+        port = &node[pos + 1];
+        struct addrinfo hints, *addrs;
+        memset(&hints, 0, sizeof(hints));
+        hints.ai_family = PF_UNSPEC;
+        hints.ai_socktype = SOCK_STREAM;
+        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
+        int p;
+        char * end;
+        p = strtol(port, &end, 0);
+
+        if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
+            break;
+        }
+
+        if (getaddrinfo(host, port, &hints, &addrs)) {
+            break;
+        }
+
+        freeaddrinfo(addrs);
+        return;
+    } while (0);
+
+    LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug 
or protocol incompatible.",
+        n.c_str());
+    THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should 
be a bug or protocol incompatible.",
+          n.c_str());
+}
+
+void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, 
bool recovery) {
+    std::string firstBadLink;
+    exception_ptr lastError;
+    bool needWrapException = true;
+
+    try {
+        sock = shared_ptr < Socket > (new TcpSocketImpl);
+        reader = shared_ptr<BufferedSocketReader>(new 
BufferedSocketReaderImpl(*sock));
+        sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
+                      connectTimeout);
+        std::vector<DatanodeInfo> targets;
+
+        for (size_t i = 1; i < nodes.size(); ++i) {
+            targets.push_back(nodes[i]);
+        }
+
+        DataTransferProtocolSender sender(*sock, writeTimeout,
+                                          nodes[0].formatAddress());
+        sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
+                          (recovery ? (stage | 0x1) : stage), nodes.size(),
+                          lastBlock->getNumBytes(), bytesSent, gs, 
checksumType, chunkSize);
+        int size;
+        size = reader->readVarint32(readTimeout);
+        std::vector<char> buf(size);
+        reader->readFully(&buf[0], size, readTimeout);
+        BlockOpResponseProto resp;
+
+        if (!resp.ParseFromArray(&buf[0], size)) {
+            THROW(HdfsIOException, "cannot parse datanode response from %s for 
block %s.",
+                  nodes[0].formatAddress().c_str(), 
lastBlock->toString().c_str());
+        }
+
+        Status pipelineStatus = resp.status();
+        firstBadLink = resp.firstbadlink();
+
+        if (Status::DT_PROTO_SUCCESS != pipelineStatus) {
+            needWrapException = false;
+
+            if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) {
+                THROW(HdfsInvalidBlockToken,
+                      "Got access token error for connect ack with 
firstBadLink as %s for block %s",
+                      firstBadLink.c_str(), lastBlock->toString().c_str());
+            } else {
+                THROW(HdfsIOException, "Bad connect ack with firstBadLink as 
%s for block %s",
+                      firstBadLink.c_str(), lastBlock->toString().c_str());
+            }
+        }
+
+        return;
+    } catch (...) {
+        errorIndex = 0;
+        lastError = current_exception();
+    }
+
+    checkBadLinkFormat(firstBadLink);
+
+    if (!firstBadLink.empty()) {
+        for (size_t i = 0; i < nodes.size(); ++i) {
+            if (nodes[i].getXferAddr() == firstBadLink) {
+                errorIndex = i;
+                break;
+            }
+        }
+    }
+
+    assert(lastError);
+
+    if (!needWrapException) {
+        rethrow_exception(lastError);
+    }
+
+    try {
+        rethrow_exception(lastError);
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "Cannot create block output stream for block %s, "
+                     "recovery flag: %s, with last generate stamp %" PRId64 
".",
+                     lastBlock->toString().c_str(), (recovery ? "true" : 
"false"), gs);
+    }
+}
+
+void PipelineImpl::resend() {
+    assert(stage != PIPELINE_CLOSE);
+
+    for (size_t i = 0; i < packets.size(); ++i) {
+        ConstPacketBuffer b = packets[i]->getBuffer();
+        sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
+        int64_t tmp = packets[i]->getLastByteOffsetBlock();
+        bytesSent = bytesSent > tmp ? bytesSent : tmp;
+    }
+}
+
+void PipelineImpl::send(shared_ptr<Packet> packet) {
+    ConstPacketBuffer buffer = packet->getBuffer();
+
+    if (!packet->isHeartbeat()) {
+        packets.push_back(packet);
+    }
+
+    /*
+     * too many packets pending on the ack. wait in case of consuming to much 
memory.
+     */
+    if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) {
+        waitForAcks(false);
+    }
+
+    bool failover = false;
+
+    do {
+        try {
+            if (failover) {
+                resend();
+            } else {
+                assert(sock);
+                sock->writeFully(buffer.getBuffer(), buffer.getSize(),
+                                 writeTimeout);
+                int64_t tmp = packet->getLastByteOffsetBlock();
+                bytesSent = bytesSent > tmp ? bytesSent : tmp;
+            }
+
+            checkResponse(false);
+            return;
+        } catch (const HdfsIOException & e) {
+            if (errorIndex < 0) {
+                errorIndex = 0;
+            }
+
+            sock.reset();
+        }
+
+        buildForAppendOrRecovery(true);
+        failover = true;
+
+        if (stage == PIPELINE_CLOSE) {
+            assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
+            packets.clear();
+            break;
+        }
+    } while (true);
+}
+
+void PipelineImpl::processAck(PipelineAck & ack) {
+    assert(!ack.isInvalid());
+    int64_t seqno = ack.getSeqno();
+
+    if (HEART_BEAT_SEQNO == seqno) {
+        return;
+    }
+
+    assert(!packets.empty());
+    Packet & packet = *packets[0];
+
+    if (ack.isSuccess()) {
+        if (packet.getSeqno() != seqno) {
+            THROW(HdfsIOException,
+                  "processAck: pipeline ack expecting seqno %" PRId64 "  but 
received %" PRId64 " for block %s.",
+                  packet.getSeqno(), seqno, lastBlock->toString().c_str());
+        }
+
+        int64_t tmp = packet.getLastByteOffsetBlock();
+        bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
+        assert(lastBlock);
+        lastBlock->setNumBytes(bytesAcked);
+
+        if (packet.isLastPacketInBlock()) {
+            sock.reset();
+        }
+
+        packetPool.relesePacket(packets[0]);
+        packets.pop_front();
+    } else {
+        for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
+            if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) {
+                errorIndex = i;
+                /*
+                 * handle block token expire as same as HdfsIOException.
+                 */
+                THROW(HdfsIOException,
+                      "processAck: ack report error at node: %s for block %s.",
+                      nodes[i].formatAddress().c_str(), 
lastBlock->toString().c_str());
+            }
+        }
+    }
+}
+
+void PipelineImpl::processResponse() {
+    PipelineAck ack;
+    std::vector<char> buf;
+    int size = reader->readVarint32(readTimeout);
+    ack.reset();
+    buf.resize(size);
+    reader->readFully(&buf[0], size, readTimeout);
+    ack.readFrom(&buf[0], size);
+
+    if (ack.isInvalid()) {
+        THROW(HdfsIOException,
+              "processAllAcks: get an invalid DataStreamer packet ack for 
block %s",
+              lastBlock->toString().c_str());
+    }
+
+    processAck(ack);
+}
+
+void PipelineImpl::checkResponse(bool wait) {
+    int timeout = wait ? readTimeout : 0;
+    bool readable = reader->poll(timeout);
+
+    if (readable) {
+        processResponse();
+    } else if (wait) {
+        THROW(HdfsIOException, "Timeout when reading response for block %s, 
datanode %s do not response.",
+              lastBlock->toString().c_str(),
+              nodes[0].formatAddress().c_str());
+    }
+}
+
+void PipelineImpl::flush() {
+    waitForAcks(true);
+}
+
+void PipelineImpl::waitForAcks(bool force) {
+    bool failover = false;
+
+    while (!packets.empty()) {
+        /*
+         * just wait for some acks in case of consuming too much memory.
+         */
+        if (!force && static_cast<int>(packets.size()) < 
packetPool.getMaxSize()) {
+            return;
+        }
+
+        try {
+            if (failover) {
+                resend();
+            }
+
+            checkResponse(true);
+            failover = false;
+        } catch (const HdfsIOException & e) {
+            if (errorIndex < 0) {
+                errorIndex = 0;
+            }
+
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "Failed to flush pipeline on datanode %s for block %s file 
%s.\n%s",
+                nodes[errorIndex].formatAddress().c_str(), 
lastBlock->toString().c_str(),
+                path.c_str(), GetExceptionDetail(e, buffer));
+            LOG(INFO, "Rebuild pipeline to flush for block %s file %s.", 
lastBlock->toString().c_str(), path.c_str());
+            sock.reset();
+            failover = true;
+        }
+
+        if (failover) {
+            buildForAppendOrRecovery(true);
+
+            if (stage == PIPELINE_CLOSE) {
+                assert(packets.size() == 1 && 
packets[0]->isLastPacketInBlock());
+                packets.clear();
+                break;
+            }
+        }
+    }
+}
+
+shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) {
+    waitForAcks(true);
+    lastPacket->setLastPacketInBlock(true);
+    stage = PIPELINE_CLOSE;
+    send(lastPacket);
+    waitForAcks(true);
+    sock.reset();
+    lastBlock->setNumBytes(bytesAcked);
+    LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64,
+        path.c_str(), lastBlock->toString().c_str(),
+        lastBlock->getNumBytes());
+    return lastBlock;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Pipeline.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Pipeline.h 
b/depends/libhdfs3/src/client/Pipeline.h
new file mode 100644
index 0000000..5e30dd6
--- /dev/null
+++ b/depends/libhdfs3/src/client/Pipeline.h
@@ -0,0 +1,203 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
+#define _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_
+
+#include "FileSystemInter.h"
+#include "Memory.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "Packet.h"
+#include "PacketPool.h"
+#include "PipelineAck.h"
+#include "server/DatanodeInfo.h"
+#include "server/LocatedBlock.h"
+#include "server/Namenode.h"
+#include "SessionConfig.h"
+#include "Thread.h"
+
+#include <vector>
+#include <deque>
+
+namespace Hdfs {
+namespace Internal {
+
+enum BlockConstructionStage {
+    /**
+     * The enumerates are always listed as regular stage followed by the
+     * recovery stage.
+     * Changing this order will make getRecoveryStage not working.
+     */
+    // pipeline set up for block append
+    PIPELINE_SETUP_APPEND = 0,
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY = 1,
+    // data streaming
+    DATA_STREAMING = 2,
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY = 3,
+    // close the block and pipeline
+    PIPELINE_CLOSE = 4,
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY = 5,
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE = 6
+};
+
+static inline const char * StageToString(BlockConstructionStage stage) {
+    switch (stage) {
+    case PIPELINE_SETUP_APPEND:
+        return "PIPELINE_SETUP_APPEND";
+
+    case PIPELINE_SETUP_APPEND_RECOVERY:
+        return "PIPELINE_SETUP_APPEND_RECOVERY";
+
+    case DATA_STREAMING:
+        return "DATA_STREAMING";
+
+    case PIPELINE_SETUP_STREAMING_RECOVERY:
+        return "PIPELINE_SETUP_STREAMING_RECOVERY";
+
+    case PIPELINE_CLOSE:
+        return "PIPELINE_CLOSE";
+
+    case PIPELINE_CLOSE_RECOVERY:
+        return "PIPELINE_CLOSE_RECOVERY";
+
+    case PIPELINE_SETUP_CREATE:
+        return "PIPELINE_SETUP_CREATE";
+
+    default:
+        return "UNKNOWN STAGE";
+    }
+}
+
+class Packet;
+class OutputStreamImpl;
+
+/**
+ * setup, data transfer, close, and failover.
+ */
+class Pipeline {
+public:
+
+    virtual ~Pipeline() {}
+
+    /**
+     * send all data and wait for all ack.
+     */
+    virtual void flush() = 0;
+
+    /**
+     * send LastPacket and close the pipeline.
+     */
+    virtual shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket) = 0;
+
+    /**
+     * send a packet, retry on error until fatal.
+     * @param packet
+     */
+    virtual void send(shared_ptr<Packet> packet) = 0;
+};
+
+class PipelineImpl : public Pipeline {
+public:
+    /**
+     * construct and setup the pipeline for append.
+     */
+    PipelineImpl(bool append, const char * path, const SessionConfig & conf,
+                 shared_ptr<FileSystemInter> filesystem, int checksumType, int 
chunkSize,
+                 int replication, int64_t bytesSent, PacketPool & packetPool,
+                 shared_ptr<LocatedBlock> lastBlock);
+
+    /**
+     * send all data and wait for all ack.
+     */
+    void flush();
+
+    /**
+     * send LastPacket and close the pipeline.
+     */
+    shared_ptr<LocatedBlock> close(shared_ptr<Packet> lastPacket);
+
+    /**
+     * send a packet, retry on error until fatal.
+     * @param packet
+     */
+    void send(shared_ptr<Packet> packet);
+
+private:
+    bool addDatanodeToPipeline(const std::vector<DatanodeInfo> & 
excludedNodes);
+    void buildForAppendOrRecovery(bool recovery);
+    void buildForNewBlock();
+    void checkPipelineWithReplicas();
+    void checkResponse(bool wait);
+    void createBlockOutputStream(const Token & token, int64_t gs, bool 
recovery);
+    void locateNextBlock(const std::vector<DatanodeInfo> & excludedNodes);
+    void processAck(PipelineAck & ack);
+    void processResponse();
+    void resend();
+    void waitForAcks(bool force);
+    void transfer(const ExtendedBlock & blk, const DatanodeInfo & src,
+                  const std::vector<DatanodeInfo> & targets,
+                  const Token & token);
+    int findNewDatanode(const std::vector<DatanodeInfo> & original);
+
+private:
+    static void checkBadLinkFormat(const std::string & node);
+
+private:
+    BlockConstructionStage stage;
+    bool canAddDatanode;
+    int blockWriteRetry;
+    int checksumType;
+    int chunkSize;
+    int connectTimeout;
+    int errorIndex;
+    int readTimeout;
+    int replication;
+    int writeTimeout;
+    int64_t bytesAcked; //the size of bytes the ack received.
+    int64_t bytesSent; //the size of bytes has sent.
+    PacketPool & packetPool;
+    shared_ptr<BufferedSocketReader> reader;
+    shared_ptr<FileSystemInter> filesystem;
+    shared_ptr<LocatedBlock> lastBlock;
+    shared_ptr<Socket> sock;
+    std::deque<shared_ptr<Packet> > packets;
+    std::string clientName;
+    std::string path;
+    std::vector<DatanodeInfo> nodes;
+    std::vector<std::string> storageIDs;
+
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINE_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PipelineAck.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PipelineAck.h 
b/depends/libhdfs3/src/client/PipelineAck.h
new file mode 100644
index 0000000..3498ca9
--- /dev/null
+++ b/depends/libhdfs3/src/client/PipelineAck.h
@@ -0,0 +1,92 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_
+#define _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_
+
+#include "datatransfer.pb.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class PipelineAck {
+public:
+    PipelineAck() :
+        invalid(true) {
+    }
+
+    PipelineAck(const char * buf, int size) :
+        invalid(false) {
+        readFrom(buf, size);
+    }
+
+    bool isInvalid() {
+        return invalid;
+    }
+
+    int getNumOfReplies() {
+        return proto.status_size();
+    }
+
+    int64_t getSeqno() {
+        return proto.seqno();
+    }
+
+    Status getReply(int i) {
+        return proto.status(i);
+    }
+
+    bool isSuccess() {
+        int size = proto.status_size();
+
+        for (int i = 0; i < size; ++i) {
+            if (Status::DT_PROTO_SUCCESS != proto.status(i)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    void readFrom(const char * buf, int size) {
+        invalid = !proto.ParseFromArray(buf, size);
+    }
+
+    void reset() {
+        proto.Clear();
+        invalid = true;
+    }
+
+private:
+    PipelineAckProto proto;
+    bool invalid;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PIPELINEACK_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp 
b/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp
new file mode 100644
index 0000000..4aea4ee
--- /dev/null
+++ b/depends/libhdfs3/src/client/ReadShortCircuitInfo.cpp
@@ -0,0 +1,364 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "client/DataTransferProtocolSender.h"
+#include "ReadShortCircuitInfo.h"
+#include "server/Datanode.h"
+#include "datatransfer.pb.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "network/DomainSocket.h"
+#include "SWCrc32c.h"
+#include "HWCrc32c.h"
+#include "StringUtil.h"
+
+#include <inttypes.h>
+#include <sstream>
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+ReadShortCircuitFDCacheType
+    ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache;
+BlockLocalPathInfoCacheType
+    ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache;
+
+ReadShortCircuitInfo::~ReadShortCircuitInfo() {
+  try {
+    dataFile.reset();
+    metaFile.reset();
+    ReadShortCircuitInfoBuilder::release(*this);
+  } catch (...) {
+  }
+}
+
+ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() {
+  if (metafd != -1) {
+    ::close(metafd);
+  }
+
+  if (datafd != -1) {
+    ::close(datafd);
+  }
+}
+
+ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder(
+    const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf)
+    : dnInfo(dnInfo), auth(auth), conf(conf) {}
+
+shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate(
+    const ExtendedBlock& block, const Token token) {
+  shared_ptr<ReadShortCircuitInfo> retval;
+  ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
+                              block.getPoolId());
+
+  if (conf.isLegacyLocalBlockReader()) {
+    if (auth.getProtocol() != AuthProtocol::NONE) {
+      LOG(WARNING,
+          "Legacy read-shortcircuit only works for simple "
+          "authentication");
+      return shared_ptr<ReadShortCircuitInfo>();
+    }
+
+    BlockLocalPathInfo info = getBlockLocalPathInfo(block, token);
+    assert(block.getBlockId() == info.getBlock().getBlockId() &&
+           block.getPoolId() == info.getBlock().getPoolId());
+
+    if (0 != access(info.getLocalMetaPath(), R_OK)) {
+      invalidBlockLocalPathInfo(block);
+      LOG(WARNING,
+          "Legacy read-shortcircuit is enabled but path:%s is not "
+          "readable.",
+          info.getLocalMetaPath());
+      return shared_ptr<ReadShortCircuitInfo>();
+    }
+
+    retval = createReadShortCircuitInfo(key, info);
+  } else {
+    shared_ptr<ReadShortCircuitFDHolder> fds;
+
+    // find a pair available file descriptors in cache.
+    if (ReadShortCircuitFDCache.findAndErase(key, &fds)) {
+      try {
+        LOG(DEBUG1,
+            "Get file descriptors from cache for block %s, cache size %zu",
+            block.toString().c_str(), ReadShortCircuitFDCache.size());
+
+        return createReadShortCircuitInfo(key, fds);
+      } catch (...) {
+        // failed to create file wrapper from fds, retry with new fds.
+      }
+    }
+
+    // create a new one
+    retval = createReadShortCircuitInfo(key, block, token);
+    ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize());
+  }
+
+  return retval;
+}
+
+void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) {
+  if (info.isValid() && !info.isLegacy()) {
+    ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder());
+    LOG(DEBUG1,
+        "Inserted file descriptors into cache for block %s, cache size %zu",
+        info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size());
+  }
+}
+
+BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo(
+    const ExtendedBlock& block, const Token& token) {
+  BlockLocalPathInfo retval;
+
+  ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
+                              block.getPoolId());
+
+  try {
+    if (!BlockLocalPathInfoCache.find(key, &retval)) {
+      RpcAuth a = auth;
+      SessionConfig c = conf;
+      c.setRpcMaxRetryOnConnect(1);
+
+      /*
+       * only kerberos based authentication is allowed, do not add
+       * token
+       */
+      shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl(
+          dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a));
+      dn->getBlockLocalPathInfo(block, token, retval);
+
+      BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize());
+      BlockLocalPathInfoCache.insert(key, retval);
+
+      LOG(DEBUG1, "Inserted block %s to local block info cache, cache size 
%zu",
+          block.toString().c_str(), BlockLocalPathInfoCache.size());
+    } else {
+      LOG(DEBUG1,
+          "Get local block info from cache for block %s, cache size %zu",
+          block.toString().c_str(), BlockLocalPathInfoCache.size());
+    }
+  } catch (const HdfsIOException& e) {
+    throw;
+  } catch (const HdfsException& e) {
+    NESTED_THROW(HdfsIOException,
+                 "ReadShortCircuitInfoBuilder: Failed to get block local "
+                 "path information.");
+  }
+
+  return retval;
+}
+
+void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo(
+    const ExtendedBlock& block) {
+  BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey(
+      dnInfo.getXferPort(), block.getBlockId(), block.getPoolId()));
+}
+
+shared_ptr<ReadShortCircuitInfo>
+ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
+    const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) {
+  shared_ptr<FileWrapper> dataFile;
+  shared_ptr<FileWrapper> metaFile;
+
+  std::string metaFilePath = info.getLocalMetaPath();
+  std::string dataFilePath = info.getLocalBlockPath();
+
+  if (conf.doUseMappedFile()) {
+    metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
+    dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
+  } else {
+    metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
+    dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
+  }
+
+  if (!metaFile->open(metaFilePath)) {
+    THROW(HdfsIOException,
+          "ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s",
+          metaFilePath.c_str(), GetSystemErrorInfo(errno));
+  }
+
+  if (!dataFile->open(dataFilePath)) {
+    THROW(HdfsIOException,
+          "ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s",
+          dataFilePath.c_str(), GetSystemErrorInfo(errno));
+  }
+
+  dataFile->seek(0);
+  metaFile->seek(0);
+
+  shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true));
+  retval->setDataFile(dataFile);
+  retval->setMetaFile(metaFile);
+  return retval;
+}
+
+std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress(
+    uint32_t port) {
+  std::string domainSocketPath = conf.getDomainSocketPath();
+
+  if (domainSocketPath.empty()) {
+    THROW(HdfsIOException,
+          "ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not "
+          "set");
+  }
+
+  std::stringstream ss;
+  ss.imbue(std::locale::classic());
+  ss << port;
+  StringReplaceAll(domainSocketPath, "_PORT", ss.str());
+
+  return domainSocketPath;
+}
+
+shared_ptr<ReadShortCircuitInfo>
+ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
+    const ReadShortCircuitInfoKey& key, const ExtendedBlock& block,
+    const Token& token) {
+  std::string addr = buildDomainSocketAddress(key.dnPort);
+  DomainSocketImpl sock;
+  sock.connect(addr.c_str(), 0, conf.getInputConnTimeout());
+  DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr);
+  sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion);
+  shared_ptr<ReadShortCircuitFDHolder> fds =
+      receiveReadShortCircuitFDs(sock, block);
+  return createReadShortCircuitInfo(key, fds);
+}
+
+shared_ptr<ReadShortCircuitFDHolder>
+ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs(
+    Socket& sock, const ExtendedBlock& block) {
+  std::vector<char> respBuffer;
+  int readTimeout = conf.getInputReadTimeout();
+  shared_ptr<BufferedSocketReader> in(
+      new BufferedSocketReaderImpl(sock, 0));  // disable buffer
+  int32_t respSize = in->readVarint32(readTimeout);
+
+  if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
+    THROW(HdfsIOException,
+          "ReadShortCircuitInfoBuilder get a invalid response size: %d, "
+          "Block: %s, "
+          "from Datanode: %s",
+          respSize, block.toString().c_str(), dnInfo.formatAddress().c_str());
+  }
+
+  respBuffer.resize(respSize);
+  in->readFully(&respBuffer[0], respSize, readTimeout);
+  BlockOpResponseProto resp;
+
+  if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
+    THROW(HdfsIOException,
+          "ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto "
+          "from "
+          "Datanode response, "
+          "Block: %s, from Datanode: %s",
+          block.toString().c_str(), dnInfo.formatAddress().c_str());
+  }
+
+  if (resp.status() != Status::DT_PROTO_SUCCESS) {
+    std::string msg;
+
+    if (resp.has_message()) {
+      msg = resp.message();
+    }
+
+    if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) {
+      THROW(HdfsInvalidBlockToken,
+            "ReadShortCircuitInfoBuilder: block's token is invalid. "
+            "Datanode: %s, Block: %s",
+            dnInfo.formatAddress().c_str(), block.toString().c_str());
+    } else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) {
+      THROW(HdfsIOException,
+            "short-circuit read access is disabled for "
+            "DataNode %s. reason: %s",
+            dnInfo.formatAddress().c_str(),
+            (msg.empty() ? "check Datanode's log for more information"
+                         : msg.c_str()));
+    } else {
+      THROW(HdfsIOException,
+            "ReadShortCircuitInfoBuilder: Datanode return an error when "
+            "sending read request to Datanode: %s, Block: %s, %s.",
+            dnInfo.formatAddress().c_str(), block.toString().c_str(),
+            (msg.empty() ? "check Datanode's log for more information"
+                         : msg.c_str()));
+    }
+  }
+
+  DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock);
+
+  if (NULL == domainSocket) {
+    THROW(HdfsIOException, "Read short-circuit only works with Domain Socket");
+  }
+
+  shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder);
+
+  std::vector<int> tempFds(2, -1);
+  respBuffer.resize(1);
+  domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(),
+                                       &respBuffer[0], respBuffer.size());
+
+  assert(tempFds[0] != -1 && "failed to receive data file descriptor");
+  assert(tempFds[1] != -1 && "failed to receive metadata file descriptor");
+
+  fds->datafd = tempFds[0];
+  fds->metafd = tempFds[1];
+
+  return fds;
+}
+
+shared_ptr<ReadShortCircuitInfo>
+ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
+    const ReadShortCircuitInfoKey& key,
+    const shared_ptr<ReadShortCircuitFDHolder>& fds) {
+  shared_ptr<FileWrapper> dataFile;
+  shared_ptr<FileWrapper> metaFile;
+
+  if (conf.doUseMappedFile()) {
+    metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
+    dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
+  } else {
+    metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
+    dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
+  }
+
+  metaFile->open(fds->metafd, false);
+  dataFile->open(fds->datafd, false);
+
+  dataFile->seek(0);
+  metaFile->seek(0);
+
+  shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, 
false));
+
+  retval->setFdHolder(fds);
+  retval->setDataFile(dataFile);
+  retval->setMetaFile(metaFile);
+
+  return retval;
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/ReadShortCircuitInfo.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/ReadShortCircuitInfo.h 
b/depends/libhdfs3/src/client/ReadShortCircuitInfo.h
new file mode 100644
index 0000000..dfb98d8
--- /dev/null
+++ b/depends/libhdfs3/src/client/ReadShortCircuitInfo.h
@@ -0,0 +1,193 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_
+#define _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_
+
+#include "FileWrapper.h"
+#include "Hash.h"
+#include "LruMap.h"
+#include "Memory.h"
+#include "network/Socket.h"
+#include "rpc/RpcAuth.h"
+#include "server/BlockLocalPathInfo.h"
+#include "server/DatanodeInfo.h"
+#include "server/ExtendedBlock.h"
+#include "SessionConfig.h"
+#include "Thread.h"
+#include "Token.h"
+
+namespace Hdfs {
+namespace Internal {
+
+struct ReadShortCircuitInfoKey {
+  ReadShortCircuitInfoKey(uint32_t dnPort, int64_t blockId,
+                          const std::string& bpid)
+      : dnPort(dnPort), blockId(blockId), bpid(bpid) {}
+
+  size_t hash_value() const {
+    size_t values[] = {Int32Hasher(dnPort), Int64Hasher(blockId),
+                       StringHasher(bpid)};
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+  }
+
+  bool operator==(const ReadShortCircuitInfoKey& other) const {
+    return dnPort == other.dnPort && blockId == other.blockId &&
+           bpid == other.bpid;
+  }
+
+  uint32_t dnPort;
+  int64_t blockId;
+  std::string bpid;
+};
+
+struct ReadShortCircuitFDHolder {
+ public:
+  ReadShortCircuitFDHolder() : metafd(-1), datafd(-1) {}
+  ~ReadShortCircuitFDHolder();
+
+  int metafd;
+  int datafd;
+};
+
+class ReadShortCircuitInfo {
+ public:
+  ReadShortCircuitInfo(const ReadShortCircuitInfoKey& key, bool legacy)
+      : legacy(legacy),
+        valid(true),
+        blockId(key.blockId),
+        bpid(key.bpid),
+        dnPort(key.dnPort) {}
+
+  ~ReadShortCircuitInfo();
+
+  const shared_ptr<FileWrapper>& getDataFile() const { return dataFile; }
+
+  void setDataFile(shared_ptr<FileWrapper> dataFile) {
+    this->dataFile = dataFile;
+  }
+
+  const shared_ptr<FileWrapper>& getMetaFile() const { return metaFile; }
+
+  void setMetaFile(shared_ptr<FileWrapper> metaFile) {
+    this->metaFile = metaFile;
+  }
+
+  bool isValid() const { return valid; }
+
+  void setValid(bool valid) { this->valid = valid; }
+
+  int64_t getBlockId() const { return blockId; }
+
+  void setBlockId(int64_t blockId) { this->blockId = blockId; }
+
+  const std::string& getBpid() const { return bpid; }
+
+  void setBpid(const std::string& bpid) { this->bpid = bpid; }
+
+  uint32_t getDnPort() const { return dnPort; }
+
+  void setDnPort(uint32_t dnPort) { this->dnPort = dnPort; }
+
+  ReadShortCircuitInfoKey getKey() const {
+    return ReadShortCircuitInfoKey(dnPort, blockId, bpid);
+  }
+
+  bool isLegacy() const { return legacy; }
+
+  void setLegacy(bool legacy) { this->legacy = legacy; }
+
+  const shared_ptr<ReadShortCircuitFDHolder>& getFdHolder() const {
+    return fdHolder;
+  }
+
+  void setFdHolder(const shared_ptr<ReadShortCircuitFDHolder>& fdHolder) {
+    this->fdHolder = fdHolder;
+  }
+
+  const std::string formatBlockInfo() const {
+    ExtendedBlock block;
+    block.setBlockId(blockId);
+    block.setPoolId(bpid);
+    return block.toString();
+  }
+
+ private:
+  bool legacy;
+  bool valid;
+  shared_ptr<FileWrapper> dataFile;
+  shared_ptr<FileWrapper> metaFile;
+  shared_ptr<ReadShortCircuitFDHolder> fdHolder;
+  int64_t blockId;
+  std::string bpid;
+  uint32_t dnPort;
+};
+
+typedef LruMap<ReadShortCircuitInfoKey, shared_ptr<ReadShortCircuitFDHolder> >
+    ReadShortCircuitFDCacheType;
+typedef LruMap<ReadShortCircuitInfoKey, BlockLocalPathInfo>
+    BlockLocalPathInfoCacheType;
+
+class ReadShortCircuitInfoBuilder {
+ public:
+  ReadShortCircuitInfoBuilder(const DatanodeInfo& dnInfo, const RpcAuth& auth,
+                              const SessionConfig& conf);
+  shared_ptr<ReadShortCircuitInfo> fetchOrCreate(const ExtendedBlock& block,
+                                                 const Token token);
+  static void release(const ReadShortCircuitInfo& info);
+
+ private:
+  BlockLocalPathInfo getBlockLocalPathInfo(const ExtendedBlock& block,
+                                           const Token& token);
+  void invalidBlockLocalPathInfo(const ExtendedBlock& block);
+  shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
+      const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info);
+  shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
+      const ReadShortCircuitInfoKey& key, const ExtendedBlock& block,
+      const Token& token);
+  shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
+      const ReadShortCircuitInfoKey& key,
+      const shared_ptr<ReadShortCircuitFDHolder>& fds);
+  std::string buildDomainSocketAddress(uint32_t port);
+  shared_ptr<Socket> getDomainSocketConnection(const std::string& addr);
+  shared_ptr<ReadShortCircuitFDHolder> receiveReadShortCircuitFDs(
+      Socket& sock, const ExtendedBlock& block);
+
+ private:
+  DatanodeInfo dnInfo;
+  RpcAuth auth;
+  SessionConfig conf;
+  static const int MaxReadShortCircuitVersion = 1;
+  static ReadShortCircuitFDCacheType ReadShortCircuitFDCache;
+  static BlockLocalPathInfoCacheType BlockLocalPathInfoCache;
+};
+}
+}
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::ReadShortCircuitInfoKey);
+
+#endif /* _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/RemoteBlockReader.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/RemoteBlockReader.cpp 
b/depends/libhdfs3/src/client/RemoteBlockReader.cpp
new file mode 100644
index 0000000..399fb91
--- /dev/null
+++ b/depends/libhdfs3/src/client/RemoteBlockReader.cpp
@@ -0,0 +1,373 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "BigEndian.h"
+#include "DataTransferProtocolSender.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "HWCrc32c.h"
+#include "RemoteBlockReader.h"
+#include "SWCrc32c.h"
+#include "WriteBuffer.h"
+
+#include <inttypes.h>
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+RemoteBlockReader::RemoteBlockReader(const ExtendedBlock& eb,
+                                     DatanodeInfo& datanode,
+                                     PeerCache& peerCache, int64_t start,
+                                     int64_t len, const Token& token,
+                                     const char* clientName, bool verify,
+                                     SessionConfig& conf)
+    : sentStatus(false),
+      verify(verify),
+      binfo(eb),
+      datanode(datanode),
+      checksumSize(0),
+      chunkSize(0),
+      position(0),
+      size(0),
+      cursor(start),
+      endOffset(len + start),
+      lastSeqNo(-1),
+      peerCache(peerCache) {
+
+    assert(start >= 0);
+    readTimeout = conf.getInputReadTimeout();
+    writeTimeout = conf.getInputWriteTimeout();
+    connTimeout = conf.getInputConnTimeout();
+    sock = getNextPeer(datanode);
+    in = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
+    sender = shared_ptr<DataTransferProtocol>(new DataTransferProtocolSender(
+        *sock, writeTimeout, datanode.formatAddress()));
+    sender->readBlock(eb, token, clientName, start, len);
+    checkResponse();
+}
+
+RemoteBlockReader::~RemoteBlockReader() {
+    if (sentStatus) {
+        peerCache.addConnection(sock, datanode);
+    } else {
+        sock->close();
+    }
+}
+
+shared_ptr<Socket> RemoteBlockReader::getNextPeer(const DatanodeInfo& dn) {
+    shared_ptr<Socket> sock;
+    try {
+        sock = peerCache.getConnection(dn);
+
+        if (!sock) {
+            sock = shared_ptr<Socket>(new TcpSocketImpl);
+            sock->connect(dn.getIpAddr().c_str(), dn.getXferPort(),
+                          connTimeout);
+            sock->setNoDelay(true);
+        }
+    } catch (const HdfsTimeoutException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "RemoteBlockReader: Failed to connect to %s",
+                     dn.formatAddress().c_str());
+    }
+
+    return sock;
+}
+
+void RemoteBlockReader::checkResponse() {
+    std::vector<char> respBuffer;
+    int32_t respSize = in->readVarint32(readTimeout);
+
+    if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
+        THROW(HdfsIOException, "RemoteBlockReader get a invalid response size: 
%d, Block: %s, from Datanode: %s",
+              respSize, binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+
+    respBuffer.resize(respSize);
+    in->readFully(&respBuffer[0], respSize, readTimeout);
+    BlockOpResponseProto resp;
+
+    if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
+        THROW(HdfsIOException, "RemoteBlockReader cannot parse 
BlockOpResponseProto from Datanode response, "
+              "Block: %s, from Datanode: %s", binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+
+    if (resp.status() != Status::DT_PROTO_SUCCESS) {
+        std::string msg;
+
+        if (resp.has_message()) {
+            msg = resp.message();
+        }
+
+        if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) {
+            THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token is 
invalid. Datanode: %s, Block: %s",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str());
+        } else {
+            THROW(HdfsIOException,
+                  "RemoteBlockReader: Datanode return an error when sending 
read request to Datanode: %s, Block: %s, %s.",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str(),
+                  (msg.empty() ? "check Datanode's log for more information" : 
msg.c_str()));
+        }
+    }
+
+    const ReadOpChecksumInfoProto & checksumInfo = resp.readopchecksuminfo();
+    const ChecksumProto & cs = checksumInfo.checksum();
+    chunkSize = cs.bytesperchecksum();
+
+    if (chunkSize < 0) {
+        THROW(HdfsIOException,
+              "RemoteBlockReader invalid chunk size: %d, expected range[0, %" 
PRId64 "], Block: %s, from Datanode: %s",
+              chunkSize, binfo.getNumBytes(), binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+
+    switch (cs.type()) {
+    case ChecksumTypeProto::CHECKSUM_NULL:
+        verify = false;
+        checksumSize = 0;
+        break;
+
+    case ChecksumTypeProto::CHECKSUM_CRC32:
+        THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 
checksum, Block: %s, from Datanode: %s",
+              binfo.toString().c_str(), datanode.formatAddress().c_str());
+        break;
+
+    case ChecksumTypeProto::CHECKSUM_CRC32C:
+        if (HWCrc32c::available()) {
+            checksum = shared_ptr<Checksum>(new HWCrc32c());
+        } else {
+            checksum = shared_ptr<Checksum>(new SWCrc32c());
+        }
+
+        checksumSize = sizeof(int32_t);
+        break;
+
+    default:
+        THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum 
type: %d, Block: %s, from Datanode: %s",
+              static_cast<int>(cs.type()), binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+
+    /*
+     * The offset into the block at which the first packet
+     * will start. This is necessary since reads will align
+     * backwards to a checksum chunk boundary.
+     */
+    int64_t firstChunkOffset = checksumInfo.chunkoffset();
+
+    if (firstChunkOffset < 0 || firstChunkOffset > cursor || firstChunkOffset 
<= cursor - chunkSize) {
+        THROW(HdfsIOException,
+              "RemoteBlockReader invalid first chunk offset: %" PRId64 ", 
expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s",
+              firstChunkOffset, cursor, binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+}
+
+shared_ptr<PacketHeader> RemoteBlockReader::readPacketHeader() {
+    try {
+        shared_ptr<PacketHeader> retval;
+        static const int packetHeaderLen = PacketHeader::GetPkgHeaderSize();
+        std::vector<char> buf(packetHeaderLen);
+
+        if (lastHeader && lastHeader->isLastPacketInBlock()) {
+            THROW(HdfsIOException, "RemoteBlockReader: read over block end 
from Datanode: %s, Block: %s.",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str());
+        }
+
+        in->readFully(&buf[0], packetHeaderLen, readTimeout);
+        retval = shared_ptr<PacketHeader>(new PacketHeader);
+        retval->readFields(&buf[0], packetHeaderLen);
+        return retval;
+    } catch (const HdfsIOException & e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read block 
header for Block: %s from Datanode: %s.",
+                     binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+}
+
+void RemoteBlockReader::readNextPacket() {
+    assert(position >= size);
+    lastHeader = readPacketHeader();
+    int dataSize = lastHeader->getDataLen();
+    int64_t pendingAhead = 0;
+
+    if (!lastHeader->sanityCheck(lastSeqNo)) {
+        THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity 
check for block %s from Datanode %s.",
+              binfo.toString().c_str(), datanode.formatAddress().c_str());
+    }
+
+    assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t));
+
+    if (dataSize > 0) {
+        int chunks = (dataSize + chunkSize - 1) / chunkSize;
+        int checksumLen = chunks * checksumSize;
+        size = checksumLen + dataSize;
+        assert(size == lastHeader->getPacketLen() - 
static_cast<int>(sizeof(int32_t)));
+        buffer.resize(size);
+        in->readFully(&buffer[0], size, readTimeout);
+        lastSeqNo = lastHeader->getSeqno();
+
+        if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) + 
dataSize + checksumLen) {
+            THROW(HdfsIOException, "Invalid Packet, packetLen is %d, dataSize 
is %d, checksum size is %d",
+                  lastHeader->getPacketLen(), dataSize, checksumLen);
+        }
+
+        if (verify) {
+            verifyChecksum(chunks);
+        }
+
+        /*
+         * skip checksum
+         */
+        position = checksumLen;
+        /*
+         * the first packet we get may start at the position before we required
+         */
+        pendingAhead = cursor - lastHeader->getOffsetInBlock();
+        pendingAhead = pendingAhead > 0 ? pendingAhead : 0;
+        position += pendingAhead;
+    }
+
+    /*
+     * we reach the end of the range we required, send status to datanode
+     * if datanode do not sending data anymore.
+     */
+
+    if (cursor + dataSize - pendingAhead >= endOffset && 
readTrailingEmptyPacket()) {
+        sendStatus();
+    }
+}
+
+bool RemoteBlockReader::readTrailingEmptyPacket() {
+    shared_ptr<PacketHeader> trailingHeader = readPacketHeader();
+
+    if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() 
!= 0) {
+        return false;
+    }
+
+    return true;
+}
+
+void RemoteBlockReader::sendStatus() {
+    ClientReadStatusProto status;
+
+    if (verify) {
+        status.set_status(Status::DT_PROTO_CHECKSUM_OK);
+    } else {
+        status.set_status(Status::DT_PROTO_SUCCESS);
+    }
+
+    WriteBuffer buffer;
+    int size = status.ByteSize();
+    buffer.writeVarint32(size);
+    status.SerializeToArray(buffer.alloc(size), size);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
+    sentStatus = true;
+}
+
+void RemoteBlockReader::verifyChecksum(int chunks) {
+    int dataSize = lastHeader->getDataLen();
+    char * pchecksum = &buffer[0];
+    char * pdata = &buffer[0] + (chunks * checksumSize);
+
+    for (int i = 0; i < chunks; ++i) {
+        int size = chunkSize < dataSize ? chunkSize : dataSize;
+        dataSize -= size;
+        checksum->reset();
+        checksum->update(pdata + (i * chunkSize), size);
+        uint32_t result = checksum->getValue();
+        uint32_t target = ReadBigEndian32FromArray(pchecksum + (i * 
checksumSize));
+
+        if (result != target && size == chunkSize) {
+            THROW(ChecksumException, "RemoteBlockReader: checksum not match 
for Block: %s, on Datanode: %s",
+                  binfo.toString().c_str(), datanode.formatAddress().c_str());
+        }
+    }
+
+    assert(0 == dataSize);
+}
+
+int64_t RemoteBlockReader::available() {
+    return size - position > 0 ? size - position : 0;
+}
+
+int32_t RemoteBlockReader::read(char * buf, int32_t len) {
+    assert(0 != len && NULL != buf);
+
+    if (cursor >= endOffset) {
+        THROW(HdfsIOException, "RemoteBlockReader: read over block end from 
Datanode: %s, Block: %s.",
+              datanode.formatAddress().c_str(), binfo.toString().c_str());
+    }
+
+    try {
+        if (position >= size) {
+            readNextPacket();
+        }
+
+        int32_t todo = len < size - position ? len : size - position;
+        memcpy(buf, &buffer[position], todo);
+        position += todo;
+        cursor += todo;
+        return todo;
+    } catch (const HdfsTimeoutException & e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read 
Block: %s from Datanode: %s.",
+                     binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    } catch (const HdfsNetworkException & e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read 
Block: %s from Datanode: %s.",
+                     binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+}
+
+void RemoteBlockReader::skip(int64_t len) {
+    int64_t todo = len;
+    assert(cursor + len <= endOffset);
+
+    try {
+        while (todo > 0) {
+            if (cursor >= endOffset) {
+                THROW(HdfsIOException, "RemoteBlockReader: skip over block end 
from Datanode: %s, Block: %s.",
+                      datanode.formatAddress().c_str(), 
binfo.toString().c_str());
+            }
+
+            if (position >= size) {
+                readNextPacket();
+            }
+
+            int batch = size - position;
+            batch = batch < todo ? batch : static_cast<int>(todo);
+            position += batch;
+            cursor += batch;
+            todo -= batch;
+        }
+    } catch (const HdfsTimeoutException & e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read 
Block: %s from Datanode: %s.",
+                     binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    } catch (const HdfsNetworkException & e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read 
Block: %s from Datanode: %s.",
+                     binfo.toString().c_str(), 
datanode.formatAddress().c_str());
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/RemoteBlockReader.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/RemoteBlockReader.h 
b/depends/libhdfs3/src/client/RemoteBlockReader.h
new file mode 100644
index 0000000..1b81e7a
--- /dev/null
+++ b/depends/libhdfs3/src/client/RemoteBlockReader.h
@@ -0,0 +1,111 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+#define _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+
+#include "BlockReader.h"
+#include "Checksum.h"
+#include "DataTransferProtocol.h"
+#include "Memory.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "PacketHeader.h"
+#include "PeerCache.h"
+#include "server/DatanodeInfo.h"
+#include "server/LocatedBlocks.h"
+#include "SessionConfig.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class RemoteBlockReader: public BlockReader {
+public:
+    RemoteBlockReader(const ExtendedBlock& eb, DatanodeInfo& datanode,
+                      PeerCache& peerCache, int64_t start, int64_t len,
+                      const Token& token, const char* clientName, bool verify,
+                      SessionConfig& conf);
+
+    ~RemoteBlockReader();
+
+    /**
+     * Get how many bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking.
+     */
+    virtual int64_t available();
+
+    /**
+     * To read data from block.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     * @return return the number of bytes filled in the buffer,
+     *  it may less than size. Return 0 if reach the end of block.
+     */
+    virtual int32_t read(char * buf, int32_t len);
+
+    /**
+     * Move the cursor forward len bytes.
+     * @param len The number of bytes to skip.
+     */
+    virtual void skip(int64_t len);
+
+private:
+    bool readTrailingEmptyPacket();
+    shared_ptr<PacketHeader> readPacketHeader();
+    shared_ptr<Socket> getNextPeer(const DatanodeInfo& dn);
+    void checkResponse();
+    void readNextPacket();
+    void sendStatus();
+    void verifyChecksum(int chunks);
+
+private:
+    bool sentStatus;
+    bool verify; //verify checksum or not.
+    const ExtendedBlock & binfo;
+    DatanodeInfo & datanode;
+    int checksumSize;
+    int chunkSize;
+    int connTimeout;
+    int position; //point in buffer.
+    int readTimeout;
+    int size;  //data size in buffer.
+    int writeTimeout;
+    int64_t cursor; //point in block.
+    int64_t endOffset; //offset in block requested to read to.
+    int64_t lastSeqNo; //segno of the last chunk received
+    PeerCache& peerCache;
+    shared_ptr<BufferedSocketReader> in;
+    shared_ptr<Checksum> checksum;
+    shared_ptr<DataTransferProtocol> sender;
+    shared_ptr<PacketHeader> lastHeader;
+    shared_ptr<Socket> sock;
+    std::vector<char> buffer;
+};
+
+}
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Token.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Token.cpp 
b/depends/libhdfs3/src/client/Token.cpp
new file mode 100644
index 0000000..a0cb8c0
--- /dev/null
+++ b/depends/libhdfs3/src/client/Token.cpp
@@ -0,0 +1,180 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Hash.h"
+#include "Token.h"
+#include "WritableUtils.h"
+
+#include <gsasl.h>
+
+using namespace Hdfs::Internal;
+
+namespace Hdfs {
+namespace Internal {
+
+static std::string Base64Encode(const char * input, size_t len) {
+    int rc = 0;
+    size_t outLen;
+    char * output = NULL;
+    std::string retval;
+
+    if (GSASL_OK != (rc = gsasl_base64_to(input, len, &output, &outLen))) {
+        assert(GSASL_MALLOC_ERROR == rc);
+        throw std::bad_alloc();
+    }
+
+    assert(NULL != output);
+    retval = output;
+    gsasl_free(output);
+
+    for (size_t i = 0 ; i < retval.length(); ++i) {
+        switch (retval[i]) {
+        case '+':
+            retval[i] = '-';
+            break;
+
+        case '/':
+            retval[i] = '_';
+            break;
+
+        case '=':
+            retval.resize(i);
+            break;
+
+        default:
+            break;
+        }
+    }
+
+    return retval;
+}
+
+static void Base64Decode(const std::string & urlSafe,
+                         std::vector<char> & buffer) {
+    int retval = 0, append = 0;
+    size_t outLen;
+    char * output = NULL;
+    std::string input = urlSafe;
+
+    for (size_t i = 0; i < input.length(); ++i) {
+        switch (input[i]) {
+        case '-':
+            input[i] = '+';
+            break;
+
+        case '_':
+            input[i] = '/';
+            break;
+
+        default:
+            break;
+        }
+    }
+
+    while (true) {
+        retval = gsasl_base64_from(&input[0], input.length(), &output, 
&outLen);
+
+        if (GSASL_OK != retval) {
+            switch (retval) {
+            case GSASL_BASE64_ERROR:
+                if (append++ < 2) {
+                    input.append("=");
+                    continue;
+                }
+
+                throw std::invalid_argument(
+                    "invalid input of gsasl_base64_from");
+
+            case GSASL_MALLOC_ERROR:
+                throw std::bad_alloc();
+
+            default:
+                assert(
+                    false
+                    && "unexpected return value from gsasl_base64_from");
+            }
+        }
+
+        break;
+    }
+
+    assert(outLen >= 0);
+    buffer.resize(outLen);
+    memcpy(&buffer[0], output, outLen);
+    gsasl_free(output);
+}
+
+std::string Token::toString() const {
+    try {
+        size_t len = 0;
+        std::vector<char> buffer(1024);
+        WritableUtils out(&buffer[0], buffer.size());
+        len += out.WriteInt32(identifier.size());
+        len += out.WriteRaw(&identifier[0], identifier.size());
+        len += out.WriteInt32(password.size());
+        len += out.WriteRaw(&password[0], password.size());
+        len += out.WriteText(kind);
+        len += out.WriteText(service);
+        return Base64Encode(&buffer[0], len);
+    } catch (...) {
+        NESTED_THROW(HdfsIOException, "cannot convert token to string");
+    }
+}
+
+Token & Token::fromString(const std::string & str) {
+    int32_t len;
+
+    try {
+        std::vector<char> buffer;
+        Base64Decode(str, buffer);
+        WritableUtils in(&buffer[0], buffer.size());
+        len = in.ReadInt32();
+        identifier.resize(len);
+        in.ReadRaw(&identifier[0], len);
+        len = in.ReadInt32();
+        password.resize(len);
+        in.ReadRaw(&password[0], len);
+        kind = in.ReadText();
+        service = in.ReadText();
+        return *this;
+    } catch (...) {
+        NESTED_THROW(HdfsInvalidBlockToken,
+                     "cannot construct a token from the string");
+    }
+}
+
+size_t Token::hash_value() const {
+    size_t values[] = { StringHasher(identifier), StringHasher(password),
+                        StringHasher(kind), StringHasher(service)
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Token.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Token.h 
b/depends/libhdfs3/src/client/Token.h
new file mode 100644
index 0000000..f3301d9
--- /dev/null
+++ b/depends/libhdfs3/src/client/Token.h
@@ -0,0 +1,91 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+
+#include <string>
+
+namespace Hdfs {
+namespace Internal {
+
+class Token {
+public:
+    const std::string & getIdentifier() const {
+        return identifier;
+    }
+
+    void setIdentifier(const std::string & identifier) {
+        this->identifier = identifier;
+    }
+
+    const std::string & getKind() const {
+        return kind;
+    }
+
+    void setKind(const std::string & kind) {
+        this->kind = kind;
+    }
+
+    const std::string & getPassword() const {
+        return password;
+    }
+
+    void setPassword(const std::string & password) {
+        this->password = password;
+    }
+
+    const std::string & getService() const {
+        return service;
+    }
+
+    void setService(const std::string & service) {
+        this->service = service;
+    }
+
+    bool operator ==(const Token & other) const {
+        return identifier == other.identifier && password == other.password
+               && kind == other.kind && service == other.service;
+    }
+
+    std::string toString() const;
+
+    Token & fromString(const std::string & str);
+
+    size_t hash_value() const;
+
+private:
+    std::string identifier;
+    std::string password;
+    std::string kind;
+    std::string service;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_TOKEN_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/TokenInternal.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/TokenInternal.h 
b/depends/libhdfs3/src/client/TokenInternal.h
new file mode 100644
index 0000000..141de25
--- /dev/null
+++ b/depends/libhdfs3/src/client/TokenInternal.h
@@ -0,0 +1,36 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+
+#include "Hash.h"
+#include "Token.h"
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::Token);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ */


Reply via email to