HBASE-17623 Reuse the bytes array when building the hfile block
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6bd31090 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6bd31090 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6bd31090 Branch: refs/heads/hbase-12439 Commit: 6bd3109062060f735c73b268c44022c201e6072b Parents: faf81d5 Author: Chia-Ping Tsai <chia7...@gmail.com> Authored: Wed Mar 22 03:50:48 2017 +0800 Committer: CHIA-PING TSAI <chia7...@gmail.com> Committed: Sat Mar 25 23:49:32 2017 +0800 ---------------------------------------------------------------------- .../HFileBlockDefaultEncodingContext.java | 44 +++----- .../io/encoding/HFileBlockEncodingContext.java | 14 ++- .../hadoop/hbase/io/hfile/HFileBlock.java | 109 ++++++++++--------- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 4 +- 4 files changed, 87 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6bd31090/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java index c7821e3..1045f94 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io.encoding; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryptor; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; @@ -48,7 +49,6 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext { - private byte[] onDiskBytesWithHeader; private BlockType blockType; private final DataBlockEncoding encodingAlgo; @@ -128,17 +128,12 @@ public class HFileBlockDefaultEncodingContext implements } @Override - public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException { - compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader); - return onDiskBytesWithHeader; + public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException { + return compressAfterEncoding(data, offset, length, dummyHeader); } - /** - * @param uncompressedBytesWithHeader - * @param headerBytes - * @throws IOException - */ - protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes) + private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer, + int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, byte[] headerBytes) throws IOException { Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { @@ -162,17 +157,17 @@ public class HFileBlockDefaultEncodingContext implements if (fileContext.getCompression() != Compression.Algorithm.NONE) { compressedByteStream.reset(); compressionStream.resetState(); - compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length); + compressionStream.write(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length); compressionStream.flush(); compressionStream.finish(); byte[] plaintext = compressedByteStream.toByteArray(); plaintextLength = plaintext.length; in = new ByteArrayInputStream(plaintext); } else { - plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length; - in = new ByteArrayInputStream(uncompressedBytesWithHeader, - headerBytes.length, plaintextLength); + plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length; + in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength); } if (plaintextLength > 0) { @@ -194,16 +189,13 @@ public class HFileBlockDefaultEncodingContext implements // Encrypt the data Encryption.encrypt(cryptoByteStream, in, encryptor); - onDiskBytesWithHeader = cryptoByteStream.toByteArray(); - // Increment the IV given the final block size - Encryption.incrementIv(iv, 1 + (onDiskBytesWithHeader.length / encryptor.getBlockSize())); - + Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize())); + return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); } else { cryptoByteStream.write(0); - onDiskBytesWithHeader = cryptoByteStream.toByteArray(); - + return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size()); } } else { @@ -212,14 +204,14 @@ public class HFileBlockDefaultEncodingContext implements compressedByteStream.reset(); compressedByteStream.write(headerBytes); compressionStream.resetState(); - compressionStream.write(uncompressedBytesWithHeader, - headerBytes.length, uncompressedBytesWithHeader.length + compressionStream.write(uncompressedBytesWithHeaderBuffer, + headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length); compressionStream.flush(); compressionStream.finish(); - onDiskBytesWithHeader = compressedByteStream.toByteArray(); + return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size()); } else { - onDiskBytesWithHeader = uncompressedBytesWithHeader; + return null; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6bd31090/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java index 9dc14a4..30c2a16 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.util.Bytes; /** * An encoding context that is created by a writer's encoder, and is shared @@ -73,9 +74,14 @@ public interface HFileBlockEncodingContext { EncodingState getEncodingState(); /** - * @param uncompressedBytesWithHeader encoded bytes with header - * @return Bytes with header which are ready to write out to disk. This is compressed and - * encrypted bytes applying the set compression algorithm and encryption. + * @param data encoded bytes with header + * @param offset the offset in encoded data to start at + * @param length the number of encoded bytes + * @return Bytes with header which are ready to write out to disk. + * This is compressed and encrypted bytes applying the set compression + * algorithm and encryption. The bytes may be changed. + * If need a Bytes reference for later use, clone the bytes and use that. + * Null if the data doesn't need to be compressed and encrypted. */ - byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException; + Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6bd31090/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 4711cec..066a9fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -326,7 +326,7 @@ public class HFileBlock implements Cacheable { /** * Creates a new {@link HFile} block from the given fields. This constructor - * is used when the block data has already been read and uncompressed, + * is used only while writing blocks and caching, * and is sitting in a byte buffer and we want to stuff the block into cache. * See {@link Writer#getBlockForCaching(CacheConfig)}. * @@ -338,8 +338,7 @@ public class HFileBlock implements Cacheable { * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} * @param prevBlockOffset see {@link #prevBlockOffset} - * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by - * uncompressed data. + * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) * @param fillHeader when true, write the first 4 header fields into passed buffer. * @param offset the file offset the block was read from * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} @@ -877,7 +876,7 @@ public class HFileBlock implements Cacheable { * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ - private byte[] onDiskBlockBytesWithHeader; + private ByteArrayOutputStream onDiskBlockBytesWithHeader; /** * The size of the checksum data on disk. It is used only if data is @@ -888,15 +887,6 @@ public class HFileBlock implements Cacheable { private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; /** - * Valid in the READY state. Contains the header and the uncompressed (but - * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + - * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. - * Does not store checksums. - */ - private byte[] uncompressedBlockBytesWithHeader; - - /** * Current block's start offset in the {@link HFile}. Set in * {@link #writeHeaderAndData(FSDataOutputStream)}. */ @@ -1023,42 +1013,42 @@ public class HFileBlock implements Cacheable { blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array when cache-on-write. - // Header is still the empty, 'dummy' header that is yet to be filled out. - uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for cache-on-write. In a way, the // block is ready, but not yet encoded or compressed. state = State.BLOCK_READY; + Bytes compressAndEncryptDat; if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { - onDiskBlockBytesWithHeader = dataBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + compressAndEncryptDat = dataBlockEncodingCtx. + compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); } else { - onDiskBlockBytesWithHeader = defaultBlockEncodingCtx. - compressAndEncrypt(uncompressedBlockBytesWithHeader); + compressAndEncryptDat = defaultBlockEncodingCtx. + compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); } + if (compressAndEncryptDat == null) { + compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); + } + if (onDiskBlockBytesWithHeader == null) { + onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); + } + onDiskBlockBytesWithHeader.reset(); + onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), + compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes( - onDiskBlockBytesWithHeader.length, + onDiskBlockBytesWithHeader.size(), fileContext.getBytesPerChecksum()); // Put the header for the on disk bytes; header currently is unfilled-out - putHeader(onDiskBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); - // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from - // onDiskBlockBytesWithHeader array. - if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) { - putHeader(uncompressedBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.length + numBytes, - uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); - } + putHeader(onDiskBlockBytesWithHeader, + onDiskBlockBytesWithHeader.size() + numBytes, + baosInMemory.size(), onDiskBlockBytesWithHeader.size()); if (onDiskChecksum.length != numBytes) { onDiskChecksum = new byte[numBytes]; } ChecksumUtil.generateChecksums( - onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length, + onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); } @@ -1081,6 +1071,11 @@ public class HFileBlock implements Cacheable { Bytes.putInt(dest, offset, onDiskDataSize); } + private void putHeader(ByteArrayOutputStream dest, int onDiskSize, + int uncompressedSize, int onDiskDataSize) { + putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); + } + /** * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records * the offset of this block so that it can be referenced in the next block @@ -1113,7 +1108,7 @@ public class HFileBlock implements Cacheable { protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); - out.write(onDiskBlockBytesWithHeader); + out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); out.write(onDiskChecksum); } @@ -1132,12 +1127,12 @@ public class HFileBlock implements Cacheable { // This is not very optimal, because we are doing an extra copy. // But this method is used only by unit tests. byte[] output = - new byte[onDiskBlockBytesWithHeader.length + new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; - System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0, - onDiskBlockBytesWithHeader.length); + System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, + onDiskBlockBytesWithHeader.size()); System.arraycopy(onDiskChecksum, 0, output, - onDiskBlockBytesWithHeader.length, onDiskChecksum.length); + onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); return output; } @@ -1165,7 +1160,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + + return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } @@ -1178,7 +1173,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithHeader() { expectState(State.BLOCK_READY); - return onDiskBlockBytesWithHeader.length + onDiskChecksum.length; + return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; } /** @@ -1186,7 +1181,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1194,7 +1189,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); - return uncompressedBlockBytesWithHeader.length; + return baosInMemory.size(); } /** @return true if a block is being written */ @@ -1215,29 +1210,37 @@ public class HFileBlock implements Cacheable { } /** - * Returns the header followed by the uncompressed data, even if using + * Clones the header followed by the uncompressed data, even if using * compression. This is needed for storing uncompressed blocks in the block * cache. Can be called in the "writing" state or the "block ready" state. * Returns only the header and data, does not include checksum data. * - * @return uncompressed block bytes for caching on write + * @return Returns a copy of uncompressed block bytes for caching on write */ - ByteBuffer getUncompressedBufferWithHeader() { + @VisibleForTesting + ByteBuffer cloneUncompressedBufferWithHeader() { expectState(State.BLOCK_READY); + byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); + int numBytes = (int) ChecksumUtil.numBytes( + onDiskBlockBytesWithHeader.size(), + fileContext.getBytesPerChecksum()); + putHeader(uncompressedBlockBytesWithHeader, 0, + onDiskBlockBytesWithHeader.size() + numBytes, + baosInMemory.size(), onDiskBlockBytesWithHeader.size()); return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); } /** - * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is * needed for storing packed blocks in the block cache. Expects calling semantics identical to * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, * Does not include checksum data. * - * @return packed block bytes for caching on write + * @return Returns a copy of block bytes for caching on write */ - ByteBuffer getOnDiskBufferWithHeader() { + private ByteBuffer cloneOnDiskBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(onDiskBlockBytesWithHeader); + return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); } private void expectState(State expectedState) { @@ -1268,7 +1271,9 @@ public class HFileBlock implements Cacheable { * the byte buffer passed into the constructor of this newly created * block does not have checksum data even though the header minor * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a - * 0 value in bytesPerChecksum. + * 0 value in bytesPerChecksum. This method copies the on-disk or + * uncompressed data to build the HFileBlock which is used only + * while writing blocks and caching. * * <p>TODO: Should there be an option where a cache can ask that hbase preserve block * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible @@ -1289,10 +1294,10 @@ public class HFileBlock implements Cacheable { return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), getUncompressedSizeWithoutHeader(), prevOffset, cacheConf.shouldCacheCompressed(blockType.getCategory())? - getOnDiskBufferWithHeader() : - getUncompressedBufferWithHeader(), + cloneOnDiskBufferWithHeader() : + cloneUncompressedBufferWithHeader(), FILL_HEADER, startOffset, UNSET, - onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext); + onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6bd31090/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 1c87af4..68c4587 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -390,7 +390,7 @@ public class TestHFileBlock { writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); hbw.writeHeaderAndData(os); int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; - byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array(); + byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array(); final int encodedSize = encodedResultWithHeader.length - headerLen; if (encoding != DataBlockEncoding.NONE) { // We need to account for the two-byte encoding algorithm ID that @@ -798,7 +798,7 @@ public class TestHFileBlock { totalSize += hbw.getOnDiskSizeWithHeader(); if (cacheOnWrite) - expectedContents.add(hbw.getUncompressedBufferWithHeader()); + expectedContents.add(hbw.cloneUncompressedBufferWithHeader()); if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt