Repository: hadoop Updated Branches: refs/heads/trunk 48aa3b727 -> ab638e77b
HDFS-6865. Byte array native checksumming on client side. Contributed by James Thomas. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab638e77 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab638e77 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab638e77 Branch: refs/heads/trunk Commit: ab638e77b811d9592470f7d342cd11a66efbbf0d Parents: 48aa3b7 Author: Todd Lipcon <t...@apache.org> Authored: Thu Aug 28 16:44:09 2014 -0700 Committer: Todd Lipcon <t...@apache.org> Committed: Thu Aug 28 16:44:09 2014 -0700 ---------------------------------------------------------------------- .../apache/hadoop/fs/ChecksumFileSystem.java | 8 +- .../java/org/apache/hadoop/fs/ChecksumFs.java | 8 +- .../org/apache/hadoop/fs/FSOutputSummer.java | 107 ++++++++++++------- .../org/apache/hadoop/util/DataChecksum.java | 2 + .../org/apache/hadoop/util/NativeCrc32.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 38 ++----- .../org/apache/hadoop/hdfs/TestFileAppend.java | 4 +- .../security/token/block/TestBlockToken.java | 2 + .../namenode/TestBlockUnderConstruction.java | 3 + .../namenode/TestDecommissioningStatus.java | 3 + 11 files changed, 108 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 511ca7f..c8d1b69 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -381,7 +381,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { long blockSize, Progressable progress) throws IOException { - super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4); + super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, + fs.getBytesPerSum())); int bytesPerSum = fs.getBytesPerSum(); this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, replication, blockSize, progress); @@ -405,10 +406,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { } @Override - protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) + protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, + int ckoff, int cklen) throws IOException { datas.write(b, offset, len); - sums.write(checksum); + sums.write(checksum, ckoff, cklen); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java index 4be3b29..ab5cd13 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java @@ -337,7 +337,8 @@ public abstract class ChecksumFs extends FilterFs { final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws IOException { - super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4); + super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, + fs.getBytesPerSum())); // checksumOpt is passed down to the raw fs. Unless it implements // checksum impelemts internally, checksumOpt will be ignored. @@ -370,10 +371,11 @@ public abstract class ChecksumFs extends FilterFs { } @Override - protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) + protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, + int ckoff, int cklen) throws IOException { datas.write(b, offset, len); - sums.write(checksum); + sums.write(checksum, ckoff, cklen); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index 49c919a..19cbb6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -18,13 +18,14 @@ package org.apache.hadoop.fs; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.DataChecksum; + import java.io.IOException; import java.io.OutputStream; import java.util.zip.Checksum; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - /** * This is a generic output stream for generating checksums for * data before it is written to the underlying stream @@ -33,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable abstract public class FSOutputSummer extends OutputStream { // data checksum - private Checksum sum; + private final DataChecksum sum; // internal buffer for storing data before it is checksumed private byte buf[]; // internal buffer for storing checksum @@ -41,18 +42,24 @@ abstract public class FSOutputSummer extends OutputStream { // The number of valid bytes in the buffer. private int count; - protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) { + // We want this value to be a multiple of 3 because the native code checksums + // 3 chunks simultaneously. The chosen value of 9 strikes a balance between + // limiting the number of JNI calls and flushing to the underlying stream + // relatively frequently. + private static final int BUFFER_NUM_CHUNKS = 9; + + protected FSOutputSummer(DataChecksum sum) { this.sum = sum; - this.buf = new byte[maxChunkSize]; - this.checksum = new byte[checksumSize]; + this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS]; + this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS]; this.count = 0; } /* write the data chunk in <code>b</code> staring at <code>offset</code> with - * a length of <code>len</code>, and its checksum + * a length of <code>len > 0</code>, and its checksum */ - protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum) - throws IOException; + protected abstract void writeChunk(byte[] b, int bOffset, int bLen, + byte[] checksum, int checksumOffset, int checksumLen) throws IOException; /** * Check if the implementing OutputStream is closed and should no longer @@ -66,7 +73,6 @@ abstract public class FSOutputSummer extends OutputStream { /** Write one byte */ @Override public synchronized void write(int b) throws IOException { - sum.update(b); buf[count++] = (byte)b; if(count == buf.length) { flushBuffer(); @@ -111,18 +117,17 @@ abstract public class FSOutputSummer extends OutputStream { */ private int write1(byte b[], int off, int len) throws IOException { if(count==0 && len>=buf.length) { - // local buffer is empty and user data has one chunk - // checksum and output data + // local buffer is empty and user buffer size >= local buffer size, so + // simply checksum the user buffer and send it directly to the underlying + // stream final int length = buf.length; - sum.update(b, off, length); - writeChecksumChunk(b, off, length, false); + writeChecksumChunks(b, off, length); return length; } // copy user data to local buffer int bytesToCopy = buf.length-count; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; - sum.update(b, off, bytesToCopy); System.arraycopy(b, off, buf, count, bytesToCopy); count += bytesToCopy; if (count == buf.length) { @@ -136,22 +141,45 @@ abstract public class FSOutputSummer extends OutputStream { * the underlying output stream. */ protected synchronized void flushBuffer() throws IOException { - flushBuffer(false); + flushBuffer(false, true); } - /* Forces any buffered output bytes to be checksumed and written out to - * the underlying output stream. If keep is true, then the state of - * this object remains intact. + /* Forces buffered output bytes to be checksummed and written out to + * the underlying output stream. If there is a trailing partial chunk in the + * buffer, + * 1) flushPartial tells us whether to flush that chunk + * 2) if flushPartial is true, keep tells us whether to keep that chunk in the + * buffer (if flushPartial is false, it is always kept in the buffer) + * + * Returns the number of bytes that were flushed but are still left in the + * buffer (can only be non-zero if keep is true). */ - protected synchronized void flushBuffer(boolean keep) throws IOException { - if (count != 0) { - int chunkLen = count; + protected synchronized int flushBuffer(boolean keep, + boolean flushPartial) throws IOException { + int bufLen = count; + int partialLen = bufLen % sum.getBytesPerChecksum(); + int lenToFlush = flushPartial ? bufLen : bufLen - partialLen; + if (lenToFlush != 0) { + writeChecksumChunks(buf, 0, lenToFlush); + if (!flushPartial || keep) { + count = partialLen; + System.arraycopy(buf, bufLen - count, buf, 0, count); + } else { count = 0; - writeChecksumChunk(buf, 0, chunkLen, keep); - if (keep) { - count = chunkLen; } } + + // total bytes left minus unflushed bytes left + return count - (bufLen - lenToFlush); + } + + /** + * Checksums all complete data chunks and flushes them to the underlying + * stream. If there is a trailing partial chunk, it is not flushed and is + * maintained in the buffer. + */ + public void flush() throws IOException { + flushBuffer(false, false); } /** @@ -161,18 +189,18 @@ abstract public class FSOutputSummer extends OutputStream { return count; } - /** Generate checksum for the data chunk and output data chunk & checksum - * to the underlying output stream. If keep is true then keep the - * current checksum intact, do not reset it. + /** Generate checksums for the given data chunks and output chunks & checksums + * to the underlying output stream. */ - private void writeChecksumChunk(byte b[], int off, int len, boolean keep) + private void writeChecksumChunks(byte b[], int off, int len) throws IOException { - int tempChecksum = (int)sum.getValue(); - if (!keep) { - sum.reset(); + sum.calculateChunkedSums(b, off, len, checksum, 0); + for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { + int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); + int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize(); + writeChunk(b, off + i, chunkLen, checksum, ckOffset, + sum.getChecksumSize()); } - int2byte(tempChecksum, checksum); - writeChunk(b, off, len, checksum); } /** @@ -196,9 +224,14 @@ abstract public class FSOutputSummer extends OutputStream { /** * Resets existing buffer with a new one of the specified size. */ - protected synchronized void resetChecksumChunk(int size) { - sum.reset(); + protected synchronized void setChecksumBufSize(int size) { this.buf = new byte[size]; + this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) * + sum.getChecksumSize()]; this.count = 0; } + + protected synchronized void resetChecksumBufSize() { + setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 1636af6..9f0ee35 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -339,6 +339,7 @@ public class DataChecksum implements Checksum { byte[] data, int dataOff, int dataLen, byte[] checksums, int checksumsOff, String fileName, long basePos) throws ChecksumException { + if (type.size == 0) return; if (NativeCrc32.isAvailable()) { NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id, @@ -421,6 +422,7 @@ public class DataChecksum implements Checksum { public void calculateChunkedSums( byte[] data, int dataOffset, int dataLength, byte[] sums, int sumsOffset) { + if (type.size == 0) return; if (NativeCrc32.isAvailable()) { NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id, http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java index 2f21ae1..0807d2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java @@ -42,7 +42,7 @@ class NativeCrc32 { * modified. * * @param bytesPerSum the chunk size (eg 512 bytes) - * @param checksumType the DataChecksum type constant + * @param checksumType the DataChecksum type constant (NULL is not supported) * @param sums the DirectByteBuffer pointing at the beginning of the * stored checksums * @param data the DirectByteBuffer pointing at the beginning of the http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2c56407..8268b6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen Chu via Colin Patrick McCabe) + HDFS-6865. Byte array native checksumming on client side + (James Thomas via todd) + BUG FIXES HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 14977a2..0b5ecda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -398,7 +398,7 @@ public class DFSOutputStream extends FSOutputSummer // one chunk that fills up the partial chunk. // computePacketChunkSize(0, freeInCksum); - resetChecksumChunk(freeInCksum); + setChecksumBufSize(freeInCksum); appendChunk = true; } else { // if the remaining space in the block is smaller than @@ -1563,7 +1563,7 @@ public class DFSOutputStream extends FSOutputSummer private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) throws IOException { - super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); + super(checksum); this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); @@ -1717,22 +1717,21 @@ public class DFSOutputStream extends FSOutputSummer // @see FSOutputSummer#writeChunk() @Override - protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) - throws IOException { + protected synchronized void writeChunk(byte[] b, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); - int cklen = checksum.length; int bytesPerChecksum = this.checksum.getBytesPerChecksum(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } - if (checksum.length != this.checksum.getChecksumSize()) { + if (cklen != this.checksum.getChecksumSize()) { throw new IOException("writeChunk() checksum size is supposed to be " + this.checksum.getChecksumSize() + - " but found to be " + checksum.length); + " but found to be " + cklen); } if (currentPacket == null) { @@ -1748,7 +1747,7 @@ public class DFSOutputStream extends FSOutputSummer } } - currentPacket.writeChecksum(checksum, 0, cklen); + currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len; @@ -1772,7 +1771,7 @@ public class DFSOutputStream extends FSOutputSummer // crc chunks from now on. if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { appendChunk = false; - resetChecksumChunk(bytesPerChecksum); + resetChecksumBufSize(); } if (!appendChunk) { @@ -1853,20 +1852,13 @@ public class DFSOutputStream extends FSOutputSummer long lastBlockLength = -1L; boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); synchronized (this) { - /* Record current blockOffset. This might be changed inside - * flushBuffer() where a partial checksum chunk might be flushed. - * After the flush, reset the bytesCurBlock back to its previous value, - * any partial checksum chunk will be sent now and in next packet. - */ - long saveOffset = bytesCurBlock; - Packet oldCurrentPacket = currentPacket; // flush checksum buffer, but keep checksum buffer intact - flushBuffer(true); + int numKept = flushBuffer(true, true); // bytesCurBlock potentially incremented if there was buffered data if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug( - "DFSClient flush() : saveOffset " + saveOffset + + "DFSClient flush() :" + " bytesCurBlock " + bytesCurBlock + " lastFlushOffset " + lastFlushOffset); } @@ -1883,14 +1875,6 @@ public class DFSOutputStream extends FSOutputSummer bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } } else { - // We already flushed up to this offset. - // This means that we haven't written anything since the last flush - // (or the beginning of the file). Hence, we should not have any - // packet queued prior to this call, since the last flush set - // currentPacket = null. - assert oldCurrentPacket == null : - "Empty flush should not occur with a currentPacket"; - if (isSync && bytesCurBlock > 0) { // Nothing to send right now, // and the block was partially written, @@ -1910,7 +1894,7 @@ public class DFSOutputStream extends FSOutputSummer // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. // - bytesCurBlock = saveOffset; + bytesCurBlock -= numKept; toWaitFor = lastQueuedSeqno; } // end synchronized http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index d840077..34c701d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -261,7 +261,9 @@ public class TestFileAppend{ start += 29; } stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start); - + // need to make sure we completely write out all full blocks before + // the checkFile() call (see FSOutputSummer#flush) + stm.flush(); // verify that full blocks are sane checkFile(fs, file1, 1); stm.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index 2429345..1fe7ba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -394,6 +394,8 @@ public class TestBlockToken { Path filePath = new Path(fileName); FSDataOutputStream out = fs.create(filePath, (short) 1); out.write(new byte[1000]); + // ensure that the first block is written out (see FSOutputSummer#flush) + out.flush(); LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations( fileName, 0, 1000); while (locatedBlocks.getLastLocatedBlock() == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java index 5448e7a..872ff9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java @@ -70,6 +70,9 @@ public class TestBlockUnderConstruction { long blocksBefore = stm.getPos() / BLOCK_SIZE; TestFileCreation.writeFile(stm, BLOCK_SIZE); + // need to make sure the full block is completely flushed to the DataNodes + // (see FSOutputSummer#flush) + stm.flush(); int blocksAfter = 0; // wait until the block is allocated by DataStreamer BlockLocation[] locatedBlocks; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index d01df75..2ee251b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -141,6 +141,9 @@ public class TestDecommissioningStatus { Random rand = new Random(seed); rand.nextBytes(buffer); stm.write(buffer); + // need to make sure that we actually write out both file blocks + // (see FSOutputSummer#flush) + stm.flush(); // Do not close stream, return it // so that it is not garbage collected return stm;