Repository: hadoop Updated Branches: refs/heads/trunk 2ad668748 -> 1c13519e1
HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c13519e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c13519e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c13519e Branch: refs/heads/trunk Commit: 1c13519e1e7588c3e2974138d37bf3449ca8b3df Parents: 2ad6687 Author: Andrew Wang <w...@apache.org> Authored: Thu Jun 18 08:48:09 2015 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Thu Jun 18 08:48:09 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 ++++++++++---------- .../org/apache/hadoop/hdfs/DataStreamer.java | 7 ++- 3 files changed, 40 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2545bcf..a61cf78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -656,6 +656,9 @@ Release 2.8.0 - UNRELEASED HDFS-6249. Output AclEntry in PBImageXmlWriter. (surendra singh lilhore via aajisaka) + HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. + (vinayakumarb via wang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4622be6..c16aef2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -86,6 +88,7 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { + static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -413,21 +416,30 @@ public class DFSOutputStream extends FSOutputSummer // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + - currentPacket.getSeqno() + - ", src=" + src + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + - ", blockSize=" + blockSize + - ", appendChunk=" + getStreamer().getAppendChunk()); - } - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacketFull(); + } + } - adjustChunkBoundary(); + void enqueueCurrentPacket() throws IOException { + getStreamer().waitAndQueuePacket(currentPacket); + currentPacket = null; + } - endBlock(); - } + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + + " appendChunk={}, {}", currentPacket, src, getStreamer() + .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + endBlock(); + } + + /** create an empty packet to mark the end of the block. */ + void setCurrentPacketToEmpty() throws InterruptedIOException { + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); } /** @@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer */ protected void endBlock() throws IOException { if (getStreamer().getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + setCurrentPacketToEmpty(); + enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } @@ -586,8 +595,7 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (endBlock && getStreamer().getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to @@ -595,8 +603,7 @@ public class DFSOutputStream extends FSOutputSummer currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } else { @@ -775,15 +782,11 @@ public class DFSOutputStream extends FSOutputSummer flushBuffer(); // flush from all upper layers if (currentPacket != null) { - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (getStreamer().getBytesCurBlock() != 0) { - // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); + setCurrentPacketToEmpty(); } flushInternal(); // flush all data to Datanodes http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index cecd5a0..8dd85b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -44,7 +44,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -1901,4 +1900,10 @@ class DataStreamer extends Daemon { s.close(); } } + + @Override + public String toString() { + return (block == null? null: block.getLocalBlock()) + + "@" + Arrays.toString(getNodes()); + } }