http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0000000,7a40d73..78eaa6c mode 000000,100755..100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@@ -1,0 -1,917 +1,982 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.io.InterruptedIOException; + import java.nio.channels.ClosedChannelException; + import java.util.EnumSet; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.hadoop.HadoopIllegalArgumentException; + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.crypto.CryptoProtocolVersion; + import org.apache.hadoop.fs.CanSetDropBehind; + import org.apache.hadoop.fs.CreateFlag; + import org.apache.hadoop.fs.FSOutputSummer; + import org.apache.hadoop.fs.FileAlreadyExistsException; + import org.apache.hadoop.fs.FileEncryptionInfo; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.ParentNotDirectoryException; + import org.apache.hadoop.fs.Syncable; + import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; + import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; + import org.apache.hadoop.hdfs.client.impl.DfsClientConf; + import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; + import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; + import org.apache.hadoop.hdfs.protocol.LocatedBlock; + import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; + import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; + import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; + import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; + import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; + import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; + import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; ++import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; + import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; + import org.apache.hadoop.hdfs.server.namenode.SafeModeException; + import org.apache.hadoop.hdfs.util.ByteArrayManager; + import org.apache.hadoop.io.EnumSetWritable; + import org.apache.hadoop.ipc.RemoteException; + import org.apache.hadoop.security.AccessControlException; + import org.apache.hadoop.security.token.Token; + import org.apache.hadoop.util.DataChecksum; + import org.apache.hadoop.util.DataChecksum.Type; + import org.apache.hadoop.util.Progressable; + import org.apache.hadoop.util.Time; + import org.apache.htrace.core.TraceScope; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Preconditions; + + + /**************************************************************** + * DFSOutputStream creates files from a stream of bytes. + * + * The client application writes data that is cached internally by + * this stream. Data is broken up into packets, each packet is + * typically 64K in size. A packet comprises of chunks. Each chunk + * is typically 512 bytes and has an associated checksum with it. + * + * When a client application fills up the currentPacket, it is + * enqueued into the dataQueue of DataStreamer. DataStreamer is a + * thread that picks up packets from the dataQueue and sends it to + * the first datanode in the pipeline. + * + ****************************************************************/ + @InterfaceAudience.Private + public class DFSOutputStream extends FSOutputSummer + implements Syncable, CanSetDropBehind { + static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); + /** + * Number of times to retry creating a file when there are transient + * errors (typically related to encryption zones and KeyProvider operations). + */ + @VisibleForTesting + static final int CREATE_RETRY_COUNT = 10; + @VisibleForTesting + static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = + CryptoProtocolVersion.supported(); + + protected final DFSClient dfsClient; + protected final ByteArrayManager byteArrayManager; + // closed is accessed by different threads under different locks. + protected volatile boolean closed = false; + + protected final String src; + protected final long fileId; + protected final long blockSize; + protected final int bytesPerChecksum; + + protected DFSPacket currentPacket = null; - private DataStreamer streamer; ++ protected DataStreamer streamer; + protected int packetSize = 0; // write packet size, not including the header. + protected int chunksPerPacket = 0; + protected long lastFlushOffset = 0; // offset when flush was invoked + private long initialFileSize = 0; // at time of file open + private final short blockReplication; // replication factor of file + protected boolean shouldSyncBlock = false; // force blocks to disk upon close + protected final AtomicReference<CachingStrategy> cachingStrategy; + private FileEncryptionInfo fileEncryptionInfo; + + /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ + protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, + long seqno, boolean lastPacketInBlock) throws InterruptedIOException { + final byte[] buf; + final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; + + try { + buf = byteArrayManager.newByteArray(bufferSize); + } catch (InterruptedException ie) { + final InterruptedIOException iioe = new InterruptedIOException( + "seqno=" + seqno); + iioe.initCause(ie); + throw iioe; + } + + return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno, - getChecksumSize(), lastPacketInBlock); ++ getChecksumSize(), lastPacketInBlock); + } + + @Override + protected void checkClosed() throws IOException { + if (isClosed()) { + getStreamer().getLastException().throwException4Close(); + } + } + + // + // returns the list of targets, if any, that is being currently used. + // + @VisibleForTesting + public synchronized DatanodeInfo[] getPipeline() { + if (getStreamer().streamerClosed()) { + return null; + } + DatanodeInfo[] currentNodes = getStreamer().getNodes(); + if (currentNodes == null) { + return null; + } + DatanodeInfo[] value = new DatanodeInfo[currentNodes.length]; + for (int i = 0; i < currentNodes.length; i++) { + value[i] = currentNodes[i]; + } + return value; + } + - /** ++ /** + * @return the object for computing checksum. + * The type is NULL if checksum is not computed. + */ + private static DataChecksum getChecksum4Compute(DataChecksum checksum, + HdfsFileStatus stat) { + if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) { + // do not compute checksum for writing to single replica to memory + return DataChecksum.newDataChecksum(Type.NULL, + checksum.getBytesPerChecksum()); + } + return checksum; + } - ++ + private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, + HdfsFileStatus stat, DataChecksum checksum) throws IOException { + super(getChecksum4Compute(checksum, stat)); + this.dfsClient = dfsClient; + this.src = src; + this.fileId = stat.getFileId(); + this.blockSize = stat.getBlockSize(); + this.blockReplication = stat.getReplication(); + this.fileEncryptionInfo = stat.getFileEncryptionInfo(); + this.cachingStrategy = new AtomicReference<CachingStrategy>( + dfsClient.getDefaultWriteCachingStrategy()); + if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug( + "Set non-null progress callback on DFSOutputStream " + src); + } - ++ + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + if (bytesPerChecksum <= 0) { + throw new HadoopIllegalArgumentException( + "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); + } + if (blockSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + + ") must divide block size (=" + blockSize + ")."); + } + this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); + } + + /** Construct a new output stream for creating a file. */ + protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + EnumSet<CreateFlag> flag, Progressable progress, - DataChecksum checksum, String[] favoredNodes) throws IOException { ++ DataChecksum checksum, String[] favoredNodes, boolean createStreamer) ++ throws IOException { + this(dfsClient, src, progress, stat, checksum); + this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); + + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); + - streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager, favoredNodes); ++ if (createStreamer) { ++ streamer = new DataStreamer(stat, null, dfsClient, src, progress, ++ checksum, cachingStrategy, byteArrayManager, favoredNodes); ++ } + } + + static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, + FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, + short replication, long blockSize, Progressable progress, int buffersize, + DataChecksum checksum, String[] favoredNodes) throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("newStreamForCreate", src); + try { + HdfsFileStatus stat = null; + + // Retry the create if we get a RetryStartFileException up to a maximum + // number of times + boolean shouldRetry = true; + int retryCount = CREATE_RETRY_COUNT; + while (shouldRetry) { + shouldRetry = false; + try { + stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, + new EnumSetWritable<CreateFlag>(flag), createParent, replication, + blockSize, SUPPORTED_CRYPTO_VERSIONS); + break; + } catch (RemoteException re) { + IOException e = re.unwrapRemoteException( + AccessControlException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + NSQuotaExceededException.class, + RetryStartFileException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class, + UnknownCryptoProtocolVersionException.class); + if (e instanceof RetryStartFileException) { + if (retryCount > 0) { + shouldRetry = true; + retryCount--; + } else { + throw new IOException("Too many retries because of encryption" + + " zone operations", e); + } + } else { + throw e; + } + } + } + Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); ++ final DFSOutputStream out; ++ if(stat.getErasureCodingPolicy() != null) { ++ out = new DFSStripedOutputStream(dfsClient, src, stat, ++ flag, progress, checksum, favoredNodes); ++ } else { ++ out = new DFSOutputStream(dfsClient, src, stat, ++ flag, progress, checksum, favoredNodes, true); ++ } + out.start(); + return out; + } finally { + scope.close(); + } + } + + /** Construct a new output stream for append. */ + private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) - throws IOException { ++ throws IOException { + this(dfsClient, src, progress, stat, checksum); + initialFileSize = stat.getLen(); // length of file when opened + this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); + + boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); + + this.fileEncryptionInfo = stat.getFileEncryptionInfo(); + + // The last partial block of the file has to be filled. + if (!toNewBlock && lastBlock != null) { + // indicate that we are appending to an existing block + streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, + cachingStrategy, byteArrayManager); + getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); + adjustPacketChunkSize(stat); + getStreamer().setPipelineInConstruction(lastBlock); + } else { + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), + bytesPerChecksum); + streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + favoredNodes); + } + } + + private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{ + + long usedInLastBlock = stat.getLen() % blockSize; + int freeInLastBlock = (int)(blockSize - usedInLastBlock); + + // calculate the amount of free space in the pre-existing + // last crc chunk + int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); + int freeInCksum = bytesPerChecksum - usedInCksum; + + // if there is space in the last block, then we have to + // append to that block + if (freeInLastBlock == blockSize) { + throw new IOException("The last block for file " + + src + " is full."); + } + + if (usedInCksum > 0 && freeInCksum > 0) { + // if there is space in the last partial chunk, then + // setup in such a way that the next packet will have only + // one chunk that fills up the partial chunk. + // + computePacketChunkSize(0, freeInCksum); + setChecksumBufSize(freeInCksum); + getStreamer().setAppendChunk(true); + } else { + // if the remaining space in the block is smaller than + // that expected size of of a packet, then create + // smaller size packet. + // + computePacketChunkSize( + Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock), + bytesPerChecksum); + } + } + + static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, + LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, + String[] favoredNodes) throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("newStreamForAppend", src); ++ if(stat.getErasureCodingPolicy() != null) { ++ throw new IOException("Not support appending to a striping layout file yet."); ++ } + try { + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, + progress, lastBlock, stat, checksum, favoredNodes); + out.start(); + return out; + } finally { + scope.close(); + } + } + + protected void computePacketChunkSize(int psize, int csize) { + final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; + final int chunkSize = csize + getChecksumSize(); + chunksPerPacket = Math.max(bodySize/chunkSize, 1); + packetSize = chunkSize*chunksPerPacket; + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("computePacketChunkSize: src=" + src + + ", chunkSize=" + chunkSize + + ", chunksPerPacket=" + chunksPerPacket + + ", packetSize=" + packetSize); + } + } + + protected TraceScope createWriteTraceScope() { + return dfsClient.newPathTraceScope("DFSOutputStream#write", src); + } + + // @see FSOutputSummer#writeChunk() + @Override + protected synchronized void writeChunk(byte[] b, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + dfsClient.checkOpen(); + checkClosed(); + + if (len > bytesPerChecksum) { + throw new IOException("writeChunk() buffer size is " + len + + " is larger than supported bytesPerChecksum " + + bytesPerChecksum); + } + if (cklen != 0 && cklen != getChecksumSize()) { + throw new IOException("writeChunk() checksum size is supposed to be " + + getChecksumSize() + " but found to be " + cklen); + } + + if (currentPacket == null) { + currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() + .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + ++ if (LOG.isDebugEnabled()) { ++ LOG.debug("WriteChunk allocating new packet seqno=" + + currentPacket.getSeqno() + + ", src=" + src + + ", packetSize=" + packetSize + + ", chunksPerPacket=" + chunksPerPacket + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock()); ++ ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this); + } + } + + currentPacket.writeChecksum(checksum, ckoff, cklen); + currentPacket.writeData(b, offset, len); + currentPacket.incNumChunks(); + getStreamer().incBytesCurBlock(len); + + // If packet is full, enqueue it for transmission - // + if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || + getStreamer().getBytesCurBlock() == blockSize) { + enqueueCurrentPacketFull(); + } + } + + void enqueueCurrentPacket() throws IOException { + getStreamer().waitAndQueuePacket(currentPacket); + currentPacket = null; + } + + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," - + " appendChunk={}, {}", currentPacket, src, getStreamer() - .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), ++ + " appendChunk={}, {}", currentPacket, src, getStreamer() ++ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + endBlock(); + } + + /** create an empty packet to mark the end of the block. */ + void setCurrentPacketToEmpty() throws InterruptedIOException { + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + } + + /** + * If the reopened file did not end at chunk boundary and the above + * write filled up its partial chunk. Tell the summer to generate full + * crc chunks from now on. + */ + protected void adjustChunkBoundary() { + if (getStreamer().getAppendChunk() && + getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) { + getStreamer().setAppendChunk(false); + resetChecksumBufSize(); + } + + if (!getStreamer().getAppendChunk()) { + int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), + dfsClient.getConf().getWritePacketSize()); + computePacketChunkSize(psize, bytesPerChecksum); + } + } + + /** + * if encountering a block boundary, send an empty packet to + * indicate the end of block and reset bytesCurBlock. + * + * @throws IOException + */ - protected void endBlock() throws IOException { ++ void endBlock() throws IOException { + if (getStreamer().getBytesCurBlock() == blockSize) { + setCurrentPacketToEmpty(); + enqueueCurrentPacket(); + getStreamer().setBytesCurBlock(0); + lastFlushOffset = 0; + } + } - ++ + /** + * Flushes out to all replicas of the block. The data is in the buffers + * of the DNs but not necessarily in the DN's OS buffers. + * + * It is a synchronous operation. When it returns, + * it guarantees that flushed data become visible to new readers. + * It is not guaranteed that data has been flushed to + * persistent store on the datanode. + * Block allocations are persisted on namenode. + */ + @Override + public void hflush() throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("hflush", src); + try { + flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } + } + + @Override + public void hsync() throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("hsync", src); + try { + flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } + } - ++ + /** + * The expected semantics is all data have flushed out to all replicas + * and all replicas have done posix fsync equivalent - ie the OS has + * flushed it to the disk device (but the disk may have it in its cache). - * ++ * + * Note that only the current block is flushed to the disk device. + * To guarantee durable sync across block boundaries the stream should + * be created with {@link CreateFlag#SYNC_BLOCK}. - * ++ * + * @param syncFlags + * Indicate the semantic of the sync. Currently used to specify + * whether or not to update the block length in NameNode. + */ + public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("hsync", src); + try { + flushOrSync(true, syncFlags); + } finally { + scope.close(); + } + } + + /** + * Flush/Sync buffered data to DataNodes. - * ++ * + * @param isSync + * Whether or not to require all replicas to flush data to the disk + * device + * @param syncFlags + * Indicate extra detailed semantic of the flush/sync. Currently + * mainly used to specify whether or not to update the file length in + * the NameNode + * @throws IOException + */ + private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) + throws IOException { + dfsClient.checkOpen(); + checkClosed(); + try { + long toWaitFor; + long lastBlockLength = -1L; + boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); + boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK); + synchronized (this) { + // flush checksum buffer, but keep checksum buffer intact if we do not + // need to end the current block + int numKept = flushBuffer(!endBlock, true); + // bytesCurBlock potentially incremented if there was buffered data + + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DFSClient flush(): " + + " bytesCurBlock=" + getStreamer().getBytesCurBlock() + + " lastFlushOffset=" + lastFlushOffset + + " createNewBlock=" + endBlock); + } + // Flush only if we haven't already flushed till this offset. + if (lastFlushOffset != getStreamer().getBytesCurBlock()) { + assert getStreamer().getBytesCurBlock() > lastFlushOffset; + // record the valid offset of this flush + lastFlushOffset = getStreamer().getBytesCurBlock(); + if (isSync && currentPacket == null && !endBlock) { + // Nothing to send right now, + // but sync was requested. + // Send an empty packet if we do not end the block right now + currentPacket = createPacket(packetSize, chunksPerPacket, + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); + } + } else { + if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) { + // Nothing to send right now, + // and the block was partially written, + // and sync was requested. + // So send an empty sync packet if we do not end the block right + // now + currentPacket = createPacket(packetSize, chunksPerPacket, + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); + } else if (currentPacket != null) { + // just discard the current packet since it is already been sent. + currentPacket.releaseBuffer(byteArrayManager); + currentPacket = null; + } + } + if (currentPacket != null) { + currentPacket.setSyncBlock(isSync); + enqueueCurrentPacket(); + } + if (endBlock && getStreamer().getBytesCurBlock() > 0) { + // Need to end the current block, thus send an empty packet to + // indicate this is the end of the block and reset bytesCurBlock + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock || isSync); + enqueueCurrentPacket(); + getStreamer().setBytesCurBlock(0); + lastFlushOffset = 0; + } else { + // Restore state of stream. Record the last flush offset + // of the last full chunk that was flushed. + getStreamer().setBytesCurBlock( + getStreamer().getBytesCurBlock() - numKept); + } + + toWaitFor = getStreamer().getLastQueuedSeqno(); + } // end synchronized + + getStreamer().waitForAckedSeqno(toWaitFor); + + // update the block length first time irrespective of flag + if (updateLength || getStreamer().getPersistBlocks().get()) { + synchronized (this) { + if (!getStreamer().streamerClosed() + && getStreamer().getBlock() != null) { + lastBlockLength = getStreamer().getBlock().getNumBytes(); + } + } + } + // If 1) any new blocks were allocated since the last flush, or 2) to + // update length in NN is required, then persist block locations on + // namenode. + if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) { + try { + dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, + lastBlockLength); + } catch (IOException ioe) { + DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); + // If we got an error here, it might be because some other thread called + // close before our hflush completed. In that case, we should throw an + // exception that the stream is closed. + checkClosed(); + // If we aren't closed but failed to sync, we should expose that to the + // caller. + throw ioe; + } + } + + synchronized(this) { + if (!getStreamer().streamerClosed()) { + getStreamer().setHflush(); + } + } + } catch (InterruptedIOException interrupt) { + // This kind of error doesn't mean that the stream itself is broken - just the + // flushing thread got interrupted. So, we shouldn't close down the writer, + // but instead just propagate the error + throw interrupt; + } catch (IOException e) { + DFSClient.LOG.warn("Error while syncing", e); + synchronized (this) { + if (!isClosed()) { + getStreamer().getLastException().set(e); + closeThreads(true); + } + } + throw e; + } + } + + /** + * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. + */ + @Deprecated + public synchronized int getNumCurrentReplicas() throws IOException { + return getCurrentBlockReplication(); + } + + /** + * Note that this is not a public API; + * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead. - * ++ * + * @return the number of valid replicas of the current block + */ + public synchronized int getCurrentBlockReplication() throws IOException { + dfsClient.checkOpen(); + checkClosed(); + if (getStreamer().streamerClosed()) { + return blockReplication; // no pipeline, return repl factor of file + } + DatanodeInfo[] currentNodes = getStreamer().getNodes(); + if (currentNodes == null) { + return blockReplication; // no pipeline, return repl factor of file + } + return currentNodes.length; + } - ++ + /** + * Waits till all existing data is flushed and confirmations + * received from datanodes. + */ + protected void flushInternal() throws IOException { + long toWaitFor; + synchronized (this) { + dfsClient.checkOpen(); + checkClosed(); + // + // If there is data in the current buffer, send it across + // + getStreamer().queuePacket(currentPacket); + currentPacket = null; + toWaitFor = getStreamer().getLastQueuedSeqno(); + } + + getStreamer().waitForAckedSeqno(toWaitFor); + } + + protected synchronized void start() { + getStreamer().start(); + } - ++ + /** + * Aborts this output stream and releases any system + * resources associated with this stream. + */ + synchronized void abort() throws IOException { + if (isClosed()) { + return; + } + getStreamer().getLastException().set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); + closeThreads(true); + dfsClient.endFileLease(fileId); + } + + boolean isClosed() { + return closed || getStreamer().streamerClosed(); + } + + void setClosed() { + closed = true; + getStreamer().release(); + } + + // shutdown datastreamer and responseprocessor threads. + // interrupt datastreamer if force is true + protected void closeThreads(boolean force) throws IOException { + try { + getStreamer().close(force); + getStreamer().join(); + getStreamer().closeSocket(); + } catch (InterruptedException e) { + throw new IOException("Failed to shutdown streamer"); + } finally { + getStreamer().setSocketToNull(); + setClosed(); + } + } - ++ + /** + * Closes this output stream and releases any system + * resources associated with this stream. + */ + @Override + public synchronized void close() throws IOException { + TraceScope scope = + dfsClient.newPathTraceScope("DFSOutputStream#close", src); + try { + closeImpl(); + } finally { + scope.close(); + } + } + + protected synchronized void closeImpl() throws IOException { + if (isClosed()) { + getStreamer().getLastException().check(true); + return; + } + + try { + flushBuffer(); // flush from all upper layers + + if (currentPacket != null) { + enqueueCurrentPacket(); + } + + if (getStreamer().getBytesCurBlock() != 0) { + setCurrentPacketToEmpty(); + } + + flushInternal(); // flush all data to Datanodes + // get last block before destroying the streamer + ExtendedBlock lastBlock = getStreamer().getBlock(); + closeThreads(false); + TraceScope scope = dfsClient.getTracer().newScope("completeFile"); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } + dfsClient.endFileLease(fileId); + } catch (ClosedChannelException e) { + } finally { + setClosed(); + } + } + + // should be called holding (this) lock since setTestFilename() may + // be called during unit tests + protected void completeFile(ExtendedBlock last) throws IOException { + long localstart = Time.monotonicNow(); + final DfsClientConf conf = dfsClient.getConf(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); + boolean fileComplete = false; + int retries = conf.getNumBlockWriteLocateFollowingRetry(); + while (!fileComplete) { + fileComplete = + dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); + if (!fileComplete) { + final int hdfsTimeout = conf.getHdfsTimeout(); + if (!dfsClient.clientRunning + || (hdfsTimeout > 0 + && localstart + hdfsTimeout < Time.monotonicNow())) { + String msg = "Unable to close file because dfsclient " + + " was unable to contact the HDFS servers." + + " clientRunning " + dfsClient.clientRunning + + " hdfsTimeout " + hdfsTimeout; + DFSClient.LOG.info(msg); + throw new IOException(msg); + } + try { + if (retries == 0) { + throw new IOException("Unable to close file because the last block" + + " does not have enough number of replicas."); + } + retries--; + Thread.sleep(sleeptime); + sleeptime *= 2; + if (Time.monotonicNow() - localstart > 5000) { + DFSClient.LOG.info("Could not complete " + src + " retrying..."); + } + } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); + } + } + } + } + + @VisibleForTesting + public void setArtificialSlowdown(long period) { + getStreamer().setArtificialSlowdown(period); + } + + @VisibleForTesting + public synchronized void setChunksPerPacket(int value) { + chunksPerPacket = Math.min(chunksPerPacket, value); + packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; + } + + /** + * Returns the size of a file as it was when this stream was opened + */ + public long getInitialLen() { + return initialFileSize; + } + + /** + * @return the FileEncryptionInfo for this stream, or null if not encrypted. + */ + public FileEncryptionInfo getFileEncryptionInfo() { + return fileEncryptionInfo; + } + + /** + * Returns the access token currently used by streamer, for testing only + */ + synchronized Token<BlockTokenIdentifier> getBlockToken() { + return getStreamer().getBlockToken(); + } + + @Override + public void setDropBehind(Boolean dropBehind) throws IOException { + CachingStrategy prevStrategy, nextStrategy; + // CachingStrategy is immutable. So build a new CachingStrategy with the + // modifications we want, and compare-and-swap it in. + do { + prevStrategy = this.cachingStrategy.get(); + nextStrategy = new CachingStrategy.Builder(prevStrategy). - setDropBehind(dropBehind).build(); ++ setDropBehind(dropBehind).build(); + } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); + } + + @VisibleForTesting + ExtendedBlock getBlock() { + return getStreamer().getBlock(); + } + + @VisibleForTesting + public long getFileId() { + return fileId; + } + + /** + * Return the source of stream. + */ + String getSrc() { + return src; + } + + /** + * Returns the data streamer object. + */ + protected DataStreamer getStreamer() { + return streamer; + } ++ ++ @Override ++ public String toString() { ++ return getClass().getSimpleName() + ":" + streamer; ++ } ++ ++ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, ++ String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes) ++ throws IOException { ++ final DfsClientConf conf = dfsClient.getConf(); ++ int retries = conf.getNumBlockWriteLocateFollowingRetry(); ++ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); ++ long localstart = Time.monotonicNow(); ++ while (true) { ++ try { ++ return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, ++ excludedNodes, fileId, favoredNodes); ++ } catch (RemoteException e) { ++ IOException ue = e.unwrapRemoteException(FileNotFoundException.class, ++ AccessControlException.class, ++ NSQuotaExceededException.class, ++ DSQuotaExceededException.class, ++ QuotaByStorageTypeExceededException.class, ++ UnresolvedPathException.class); ++ if (ue != e) { ++ throw ue; // no need to retry these exceptions ++ } ++ if (NotReplicatedYetException.class.getName().equals(e.getClassName())) { ++ if (retries == 0) { ++ throw e; ++ } else { ++ --retries; ++ LOG.info("Exception while adding a block", e); ++ long elapsed = Time.monotonicNow() - localstart; ++ if (elapsed > 5000) { ++ LOG.info("Waiting for replication for " + (elapsed / 1000) ++ + " seconds"); ++ } ++ try { ++ LOG.warn("NotReplicatedYetException sleeping " + src ++ + " retries left " + retries); ++ Thread.sleep(sleeptime); ++ sleeptime *= 2; ++ } catch (InterruptedException ie) { ++ LOG.warn("Caught exception", ie); ++ } ++ } ++ } else { ++ throw e; ++ } ++ } ++ } ++ } + }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 0000000,9a8ca6f..191691b mode 000000,100755..100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@@ -1,0 -1,350 +1,364 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.DataOutputStream; + import java.io.IOException; + import java.nio.BufferOverflowException; ++import java.nio.ByteBuffer; + import java.nio.channels.ClosedChannelException; + import java.util.Arrays; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.hdfs.protocol.HdfsConstants; + import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; + import org.apache.hadoop.hdfs.util.ByteArrayManager; + import org.apache.htrace.core.Span; + import org.apache.htrace.core.SpanId; + import org.apache.htrace.core.TraceScope; + + /**************************************************************** + * DFSPacket is used by DataStreamer and DFSOutputStream. + * DFSOutputStream generates packets and then ask DatStreamer + * to send them to datanodes. + ****************************************************************/ + + @InterfaceAudience.Private -class DFSPacket { ++public class DFSPacket { + public static final long HEART_BEAT_SEQNO = -1L; + private static SpanId[] EMPTY = new SpanId[0]; + private final long seqno; // sequence number of buffer in block + private final long offsetInBlock; // offset in block + private boolean syncBlock; // this packet forces the current block to disk + private int numChunks; // number of chunks currently in packet + private final int maxChunks; // max chunks in packet + private byte[] buf; + private final boolean lastPacketInBlock; // is this the last packet in block? + + /** + * buf is pointed into like follows: + * (C is checksum data, D is payload data) + * + * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + * + * Right before sending, we move the checksum data to immediately precede + * the actual data, and then insert the header into the buffer immediately + * preceding the checksum data, so we make sure to keep enough space in + * front of the checksum data to support the largest conceivable header. + */ + private int checksumStart; + private int checksumPos; + private final int dataStart; + private int dataPos; + private SpanId[] traceParents = EMPTY; + private int traceParentsUsed; + private TraceScope scope; + + /** + * Create a new packet. + * + * @param buf the buffer storing data and checksums + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + * @param seqno the sequence number of this packet + * @param checksumSize the size of checksum + * @param lastPacketInBlock if this is the last packet + */ - DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, ++ public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + int checksumSize, boolean lastPacketInBlock) { + this.lastPacketInBlock = lastPacketInBlock; + this.numChunks = 0; + this.offsetInBlock = offsetInBlock; + this.seqno = seqno; + + this.buf = buf; + + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; + checksumPos = checksumStart; + dataStart = checksumStart + (chunksPerPkt * checksumSize); + dataPos = dataStart; + maxChunks = chunksPerPkt; + } + + /** + * Write data to this packet. + * + * @param inarray input array of data + * @param off the offset of data to write + * @param len the length of data to write + * @throws ClosedChannelException + */ + synchronized void writeData(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, dataPos, len); + dataPos += len; + } + ++ public synchronized void writeData(ByteBuffer inBuffer, int len) ++ throws ClosedChannelException { ++ checkBuffer(); ++ len = len > inBuffer.remaining() ? inBuffer.remaining() : len; ++ if (dataPos + len > buf.length) { ++ throw new BufferOverflowException(); ++ } ++ for (int i = 0; i < len; i++) { ++ buf[dataPos + i] = inBuffer.get(); ++ } ++ dataPos += len; ++ } ++ + /** + * Write checksums to this packet + * + * @param inarray input array of checksums + * @param off the offset of checksums to write + * @param len the length of checksums to write + * @throws ClosedChannelException + */ - synchronized void writeChecksum(byte[] inarray, int off, int len) ++ public synchronized void writeChecksum(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (len == 0) { + return; + } + if (checksumPos + len > dataStart) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, checksumPos, len); + checksumPos += len; + } + + /** + * Write the full packet, including the header, to the given output stream. + * + * @param stm + * @throws IOException + */ - synchronized void writeTo(DataOutputStream stm) throws IOException { ++ public synchronized void writeTo(DataOutputStream stm) throws IOException { + checkBuffer(); + + final int dataLen = dataPos - dataStart; + final int checksumLen = checksumPos - checksumStart; + final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; + + PacketHeader header = new PacketHeader( + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); + + if (checksumPos != dataStart) { + // Move the checksum to cover the gap. This can happen for the last + // packet or during an hflush/hsync call. + System.arraycopy(buf, checksumStart, buf, + dataStart - checksumLen , checksumLen); + checksumPos = dataStart; + checksumStart = checksumPos - checksumLen; + } + + final int headerStart = checksumStart - header.getSerializedSize(); + assert checksumStart + 1 >= header.getSerializedSize(); + assert headerStart >= 0; + assert headerStart + header.getSerializedSize() == checksumStart; + + // Copy the header data into the buffer immediately preceding the checksum + // data. + System.arraycopy(header.getBytes(), 0, buf, headerStart, + header.getSerializedSize()); + + // corrupt the data for testing. + if (DFSClientFaultInjector.get().corruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + + // Write the now contiguous full packet to the output stream. + stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + + // undo corruption. + if (DFSClientFaultInjector.get().uncorruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + } + + private synchronized void checkBuffer() throws ClosedChannelException { + if (buf == null) { + throw new ClosedChannelException(); + } + } + + /** + * Release the buffer in this packet to ByteArrayManager. + * + * @param bam + */ + synchronized void releaseBuffer(ByteArrayManager bam) { + bam.release(buf); + buf = null; + } + + /** + * get the packet's last byte's offset in the block + * + * @return the packet's last byte's offset in the block + */ + synchronized long getLastByteOffsetBlock() { + return offsetInBlock + dataPos - dataStart; + } + + /** + * Check if this packet is a heart beat packet + * + * @return true if the sequence number is HEART_BEAT_SEQNO + */ + boolean isHeartbeatPacket() { + return seqno == HEART_BEAT_SEQNO; + } + + /** + * check if this packet is the last packet in block + * + * @return true if the packet is the last packet + */ - boolean isLastPacketInBlock(){ ++ boolean isLastPacketInBlock() { + return lastPacketInBlock; + } + + /** + * get sequence number of this packet + * + * @return the sequence number of this packet + */ - long getSeqno(){ ++ long getSeqno() { + return seqno; + } + + /** + * get the number of chunks this packet contains + * + * @return the number of chunks in this packet + */ - synchronized int getNumChunks(){ ++ synchronized int getNumChunks() { + return numChunks; + } + + /** + * increase the number of chunks by one + */ - synchronized void incNumChunks(){ ++ synchronized void incNumChunks() { + numChunks++; + } + + /** + * get the maximum number of packets + * + * @return the maximum number of packets + */ - int getMaxChunks(){ ++ int getMaxChunks() { + return maxChunks; + } + + /** + * set if to sync block + * + * @param syncBlock if to sync block + */ - synchronized void setSyncBlock(boolean syncBlock){ ++ synchronized void setSyncBlock(boolean syncBlock) { + this.syncBlock = syncBlock; + } + + @Override + public String toString() { + return "packet seqno: " + this.seqno + + " offsetInBlock: " + this.offsetInBlock + + " lastPacketInBlock: " + this.lastPacketInBlock + + " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); + } + + /** + * Add a trace parent span for this packet.<p/> + * + * Trace parent spans for a packet are the trace spans responsible for + * adding data to that packet. We store them as an array of longs for + * efficiency.<p/> + * + * Protected by the DFSOutputStream dataQueue lock. + */ + public void addTraceParent(Span span) { + if (span == null) { + return; + } + addTraceParent(span.getSpanId()); + } + + public void addTraceParent(SpanId id) { + if (!id.isValid()) { + return; + } + if (traceParentsUsed == traceParents.length) { + int newLength = (traceParents.length == 0) ? 8 : + traceParents.length * 2; + traceParents = Arrays.copyOf(traceParents, newLength); + } + traceParents[traceParentsUsed] = id; + traceParentsUsed++; + } + + /** + * Get the trace parent spans for this packet.<p/> + * + * Will always be non-null.<p/> + * + * Protected by the DFSOutputStream dataQueue lock. + */ + public SpanId[] getTraceParents() { + // Remove duplicates from the array. + int len = traceParentsUsed; + Arrays.sort(traceParents, 0, len); + int i = 0, j = 0; + SpanId prevVal = SpanId.INVALID; + while (true) { + if (i == len) { + break; + } + SpanId val = traceParents[i]; + if (!val.equals(prevVal)) { + traceParents[j] = val; + j++; + prevVal = val; + } + i++; + } + if (j < traceParents.length) { + traceParents = Arrays.copyOf(traceParents, j); + traceParentsUsed = traceParents.length; + } + return traceParents; + } + + public void setTraceScope(TraceScope scope) { + this.scope = scope; + } + + public TraceScope getTraceScope() { + return scope; + } + }