This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-21879 by this push:
     new 08c8396  HBASE-21937 Make the Compression#decompress can accept 
ByteBuff as input
08c8396 is described below

commit 08c83962c7df04db19446fe4587bbea092a05d5c
Author: huzheng <[email protected]>
AuthorDate: Tue Apr 2 20:44:08 2019 +0800

    HBASE-21937 Make the Compression#decompress can accept ByteBuff as input
---
 .../hadoop/hbase/io/compress/Compression.java      | 51 ++++++++--------------
 .../encoding/HFileBlockDefaultDecodingContext.java | 11 +++--
 .../apache/hadoop/hbase/io/util}/BlockIOUtils.java | 45 ++++++++++++++++---
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 29 +++++++-----
 .../io/encoding/TestLoadAndSwitchEncodeOnDisk.java |  2 -
 .../hadoop/hbase/io/hfile/TestBlockIOUtils.java    |  1 +
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      | 18 ++++----
 7 files changed, 90 insertions(+), 67 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
index d258ba2..3004973 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
@@ -25,7 +25,8 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -438,45 +439,29 @@ public final class Compression {
   }
 
   /**
-   * Decompresses data from the given stream using the configured compression
-   * algorithm. It will throw an exception if the dest buffer does not have
-   * enough space to hold the decompressed data.
-   *
-   * @param dest
-   *          the output bytes buffer
-   * @param destOffset
-   *          start writing position of the output buffer
-   * @param bufferedBoundedStream
-   *          a stream to read compressed data from, bounded to the exact 
amount
+   * Decompresses data from the given stream using the configured compression 
algorithm. It will
+   * throw an exception if the dest buffer does not have enough space to hold 
the decompressed data.
+   * @param dest the output buffer
+   * @param bufferedBoundedStream a stream to read compressed data from, 
bounded to the exact amount
    *          of compressed data
-   * @param compressedSize
-   *          compressed data size, header not included
-   * @param uncompressedSize
-   *          uncompressed data size, header not included
-   * @param compressAlgo
-   *          compression algorithm used
-   * @throws IOException
+   * @param uncompressedSize uncompressed data size, header not included
+   * @param compressAlgo compression algorithm used
+   * @throws IOException if any IO error happen
    */
-  public static void decompress(byte[] dest, int destOffset,
-      InputStream bufferedBoundedStream, int compressedSize,
-      int uncompressedSize, Compression.Algorithm compressAlgo)
-      throws IOException {
-
-    if (dest.length - destOffset < uncompressedSize) {
-      throw new IllegalArgumentException(
-          "Output buffer does not have enough space to hold "
-              + uncompressedSize + " decompressed bytes, available: "
-              + (dest.length - destOffset));
+  public static void decompress(ByteBuff dest, InputStream 
bufferedBoundedStream,
+      int uncompressedSize, Compression.Algorithm compressAlgo) throws 
IOException {
+    if (dest.remaining() < uncompressedSize) {
+      throw new IllegalArgumentException("Output buffer does not have enough 
space to hold "
+          + uncompressedSize + " decompressed bytes, available: " + 
dest.remaining());
     }
 
     Decompressor decompressor = null;
     try {
       decompressor = compressAlgo.getDecompressor();
-      InputStream is = compressAlgo.createDecompressionStream(
-          bufferedBoundedStream, decompressor, 0);
-
-      IOUtils.readFully(is, dest, destOffset, uncompressedSize);
-      is.close();
+      try (InputStream is =
+          compressAlgo.createDecompressionStream(bufferedBoundedStream, 
decompressor, 0)) {
+        BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
+      }
     } finally {
       if (decompressor != null) {
         compressAlgo.returnDecompressor(decompressor);
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index d5bf58c..97d0e6b 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Decryptor;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -87,14 +88,12 @@ public class HFileBlockDefaultDecodingContext implements
       }
 
       Compression.Algorithm compression = fileContext.getCompression();
-      assert blockBufferWithoutHeader.hasArray();
       if (compression != Compression.Algorithm.NONE) {
-        Compression.decompress(blockBufferWithoutHeader.array(),
-            blockBufferWithoutHeader.arrayOffset(), dataInputStream, 
onDiskSizeWithoutHeader,
-            uncompressedSizeWithoutHeader, compression);
+        Compression.decompress(blockBufferWithoutHeader, dataInputStream,
+          uncompressedSizeWithoutHeader, compression);
       } else {
-        IOUtils.readFully(dataInputStream, blockBufferWithoutHeader.array(),
-            blockBufferWithoutHeader.arrayOffset(), onDiskSizeWithoutHeader);
+        BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, 
blockBufferWithoutHeader,
+          onDiskSizeWithoutHeader);
       }
     } finally {
       byteBuffInputStream.close();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
similarity index 86%
rename from 
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
rename to 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
index dbd5b2e..a98a478 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.io.hfile;
+package org.apache.hadoop.hbase.io.util;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,9 +29,14 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-class BlockIOUtils {
+public final class BlockIOUtils {
 
-  static boolean isByteBufferReadable(FSDataInputStream is) {
+  // Disallow instantiation
+  private BlockIOUtils() {
+
+  }
+
+  public static boolean isByteBufferReadable(FSDataInputStream is) {
     InputStream cur = is.getWrappedStream();
     for (;;) {
       if ((cur instanceof FSDataInputStream)) {
@@ -50,7 +55,7 @@ class BlockIOUtils {
    * @param length bytes to read.
    * @throws IOException exception to throw if any error happen
    */
-  static void readFully(ByteBuff buf, FSDataInputStream dis, int length) 
throws IOException {
+  public static void readFully(ByteBuff buf, FSDataInputStream dis, int 
length) throws IOException {
     if (!isByteBufferReadable(dis)) {
       // If InputStream does not support the ByteBuffer read, just read to 
heap and copy bytes to
       // the destination ByteBuff.
@@ -82,6 +87,32 @@ class BlockIOUtils {
   }
 
   /**
+   * Copying bytes from InputStream to {@link ByteBuff} by using an temporary 
heap byte[] (default
+   * size is 1024 now).
+   * @param in the InputStream to read
+   * @param out the destination {@link ByteBuff}
+   * @param length to read
+   * @throws IOException if any io error encountered.
+   */
+  public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int 
length)
+      throws IOException {
+    byte[] buffer = new byte[1024];
+    if (length < 0) {
+      throw new IllegalArgumentException("Length must not be negative: " + 
length);
+    }
+    int remain = length, count;
+    while (remain > 0) {
+      count = in.read(buffer, 0, Math.min(remain, buffer.length));
+      if (count < 0) {
+        throw new IOException(
+            "Premature EOF from inputStream, but still need " + remain + " 
bytes");
+      }
+      out.put(buffer, 0, count);
+      remain -= count;
+    }
+  }
+
+  /**
    * Read from an input stream at least <code>necessaryLen</code> and if 
possible,
    * <code>extraLen</code> also if available. Analogous to
    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a 
number of "extra"
@@ -125,8 +156,8 @@ class BlockIOUtils {
    *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
    * @throws IOException if failed to read the necessary bytes.
    */
-  static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int 
necessaryLen, int extraLen)
-      throws IOException {
+  public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int 
necessaryLen,
+      int extraLen) throws IOException {
     if (!isByteBufferReadable(dis)) {
       // If InputStream does not support the ByteBuffer read, just read to 
heap and copy bytes to
       // the destination ByteBuff.
@@ -174,7 +205,7 @@ class BlockIOUtils {
    * @return true if and only if extraLen is > 0 and reading those extra bytes 
was successful
    * @throws IOException if failed to read the necessary bytes
    */
-  static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long 
position,
+  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, 
long position,
       int necessaryLen, int extraLen) throws IOException {
     int remain = necessaryLen + extraLen;
     byte[] buf = new byte[remain];
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 2c8fa4d..a3738d6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -527,14 +528,22 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
-   * Returns a buffer that does not include the header or checksum.
-   *
+   * Returns a buffer that does not include the header and checksum.
    * @return the buffer with header skipped and checksum omitted.
    */
   public ByteBuff getBufferWithoutHeader() {
+    return this.getBufferWithoutHeader(false);
+  }
+
+  /**
+   * Returns a buffer that does not include the header or checksum.
+   * @param withChecksum to indicate whether include the checksum or not.
+   * @return the buffer with header skipped and checksum omitted.
+   */
+  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
     ByteBuff dup = getBufferReadOnly();
-    // Now set it up so Buffer spans content only -- no header or no checksums.
-    return dup.position(headerSize()).limit(buf.limit() - 
totalChecksumBytes()).slice();
+    int delta = withChecksum ? 0 : totalChecksumBytes();
+    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
   }
 
   /**
@@ -608,8 +617,9 @@ public class HFileBlock implements Cacheable {
     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read 
the next
     // block's header, so there are two sensible values for buffer capacity.
     int hdrSize = headerSize();
-    if (dup.capacity() != expectedBufLimit && dup.capacity() != 
expectedBufLimit + hdrSize) {
-      throw new AssertionError("Invalid buffer capacity: " + dup.capacity() +
+    dup.rewind();
+    if (dup.remaining() != expectedBufLimit && dup.remaining() != 
expectedBufLimit + hdrSize) {
+      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + 
hdrSize));
     }
   }
@@ -671,15 +681,15 @@ public class HFileBlock implements Cacheable {
     HFileBlock unpacked = new HFileBlock(this);
     unpacked.allocateBuffer(); // allocates space for the decompressed block
 
-    HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
-      reader.getBlockDecodingContext() : 
reader.getDefaultBlockDecodingContext();
+    HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
+        ? reader.getBlockDecodingContext() : 
reader.getDefaultBlockDecodingContext();
 
     ByteBuff dup = this.buf.duplicate();
     dup.position(this.headerSize());
     dup = dup.slice();
 
     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
-      unpacked.getUncompressedSizeWithoutHeader(), 
unpacked.getBufferWithoutHeader(), dup);
+      unpacked.getUncompressedSizeWithoutHeader(), 
unpacked.getBufferWithoutHeader(true), dup);
 
     return unpacked;
   }
@@ -697,7 +707,6 @@ public class HFileBlock implements Cacheable {
     ByteBuff newBuf = allocator.allocate(capacityNeeded);
 
     // Copy header bytes into newBuf.
-    // newBuf is HBB so no issue in calling array()
     buf.position(0);
     newBuf.put(0, buf, 0, headerSize);
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
index 2e4cbe4..f2630ae 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runners.Parameterized.Parameters;
@@ -74,7 +73,6 @@ public class TestLoadAndSwitchEncodeOnDisk extends 
TestMiniClusterLoadSequential
 
   @Override
   @Test
-  @Ignore("TODO Ignore this UT temporarily, will fix this in the critical 
HBASE-21937.")
   public void loadTest() throws Exception {
     Admin admin = TEST_UTIL.getAdmin();
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index 60180e6..a386f49 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 2733ca2..af42a24 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -97,8 +97,7 @@ public class TestHFileBlock {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlock.class);
 
-  // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB 
unpack yet.
-  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ 
*/ };
+  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
 
   private static final int NUM_TEST_BLOCKS = 1000;
   private static final int NUM_READER_THREADS = 26;
@@ -623,7 +622,7 @@ public class TestHFileBlock {
             if (detailedLogging) {
               LOG.info("Reading block #" + i + " at offset " + curOffset);
             }
-            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, 
true);
+            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, 
false);
             if (detailedLogging) {
               LOG.info("Block #" + i + ": " + b);
             }
@@ -638,7 +637,7 @@ public class TestHFileBlock {
             // Now re-load this block knowing the on-disk size. This tests a
             // different branch in the loader.
             HFileBlock b2 =
-                hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), 
pread, false, true);
+                hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), 
pread, false, false);
             b2.sanityCheck();
 
             assertEquals(b.getBlockType(), b2.getBlockType());
@@ -667,11 +666,10 @@ public class TestHFileBlock {
               // expectedContents have header + data only
               ByteBuff bufRead = newBlock.getBufferReadOnly();
               ByteBuffer bufExpected = expectedContents.get(i);
-              boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
-                  bufRead.arrayOffset(),
-                  bufRead.limit() - newBlock.totalChecksumBytes(),
-                  bufExpected.array(), bufExpected.arrayOffset(),
-                  bufExpected.limit()) == 0;
+              byte[] tmp = new byte[bufRead.limit() - 
newBlock.totalChecksumBytes()];
+              bufRead.get(tmp, 0, tmp.length);
+              boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, 
bufExpected.array(),
+                bufExpected.arrayOffset(), bufExpected.limit()) == 0;
               String wrongBytesMsg = "";
 
               if (!bytesAreCorrect) {
@@ -702,6 +700,8 @@ public class TestHFileBlock {
               if (newBlock != b) {
                 assertTrue(b.release());
               }
+            } else {
+              assertTrue(b.release());
             }
           }
           assertEquals(curOffset, fs.getFileStatus(path).getLen());

Reply via email to