This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-28028
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28028 by this push:
     new 5e5c210bf85 HBASE-28028 Read all compressed bytes to a byte array 
before submitting them to decompressor
5e5c210bf85 is described below

commit 5e5c210bf8553b9a9e09adc7aedf6a10fb5953cc
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Thu Aug 17 22:54:19 2023 +0800

    HBASE-28028 Read all compressed bytes to a byte array before submitting 
them to decompressor
---
 .../hbase/regionserver/wal/CompressionContext.java |  8 +--
 ...LDecompressionBoundedDelegatingInputStream.java | 78 ++++++++++------------
 2 files changed, 40 insertions(+), 46 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 73cf4821db0..8f6d1792954 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -77,7 +76,7 @@ public class CompressionContext {
     private final Compression.Algorithm algorithm;
     private Compressor compressor;
     private Decompressor decompressor;
-    private BoundedDelegatingInputStream lowerIn;
+    private WALDecompressionBoundedDelegatingInputStream lowerIn;
     private ByteArrayOutputStream lowerOut;
     private InputStream compressedIn;
     private OutputStream compressedOut;
@@ -115,13 +114,11 @@ public class CompressionContext {
 
       // Create the input streams here the first time around.
       if (compressedIn == null) {
-        lowerIn = new BoundedDelegatingInputStream(in, inLength);
+        lowerIn = new WALDecompressionBoundedDelegatingInputStream(in);
         if (decompressor == null) {
           decompressor = algorithm.getDecompressor();
         }
         compressedIn = algorithm.createDecompressionStream(lowerIn, 
decompressor, IO_BUFFER_SIZE);
-      } else {
-        lowerIn.setDelegate(in, inLength);
       }
       if (outLength == 0) {
         // The BufferedInputStream will return earlier and skip reading 
anything if outLength == 0,
@@ -131,6 +128,7 @@ public class CompressionContext {
         // such as data loss when splitting wal or replicating wal.
         IOUtils.skipFully(in, inLength);
       } else {
+        lowerIn.resetLimit(inLength);
         IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
       }
     }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
similarity index 57%
rename from 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
index 2a6db09050c..688a5053c6e 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,75 +15,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.io;
+package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is a stream that will only supply bytes from its delegate up to a 
certain limit. When there
- * is an attempt to set the position beyond that it will signal that the input 
is finished.
+ * This class is only used by WAL ValueCompressor for decompression.
+ * <p>
+ * <strong>WARNING: </strong>The implementation is very tricky and does not 
follow typical
+ * InputStream pattern, so do not use it in any other places.
  */
 @InterfaceAudience.Private
-public class BoundedDelegatingInputStream extends DelegatingInputStream {
+class WALDecompressionBoundedDelegatingInputStream extends InputStream {
 
-  protected long limit;
-  protected long pos;
+  private static final Logger LOG =
+    
LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class);
 
-  public BoundedDelegatingInputStream(InputStream in, long limit) {
-    super(in);
-    this.limit = limit;
-    this.pos = 0;
-  }
+  private final InputStream in;
+
+  private long pos;
+
+  private long limit;
 
-  public void setDelegate(InputStream in, long limit) {
+  public WALDecompressionBoundedDelegatingInputStream(InputStream in) {
     this.in = in;
+  }
+
+  public void resetLimit(long limit) {
     this.limit = limit;
     this.pos = 0;
   }
 
-  /**
-   * Call the delegate's {@code read()} method if the current position is less 
than the limit.
-   * @return the byte read or -1 if the end of stream or the limit has been 
reached.
-   */
   @Override
   public int read() throws IOException {
     if (pos >= limit) {
       return -1;
     }
     int result = in.read();
+    if (result < 0) {
+      return -1;
+    }
     pos++;
     return result;
   }
 
-  /**
-   * Call the delegate's {@code read(byte[], int, int)} method if the current 
position is less than
-   * the limit.
-   * @param b   read buffer
-   * @param off Start offset
-   * @param len The number of bytes to read
-   * @return the number of bytes read or -1 if the end of stream or the limit 
has been reached.
-   */
   @Override
-  public int read(final byte[] b, final int off, final int len) throws 
IOException {
+  public int read(byte[] b, int off, int len) throws IOException {
     if (pos >= limit) {
       return -1;
     }
-    long readLen = Math.min(len, limit - pos);
-    int read = in.read(b, off, (int) readLen);
-    if (read < 0) {
+    int readLen = (int) Math.min(len, limit - pos);
+    try {
+      IOUtils.readFully(in, b, off, readLen);
+    } catch (EOFException e) {
+      // This is trick here, we will always try to read enough bytes to fill 
the buffer passed in,
+      // or we reach the end of this compression block, if there are not 
enough bytes, we just
+      // return -1 to let the upper layer fail with EOF
+      // In WAL value decompression this is OK as if we can not read all the 
data, we will finally
+      // get an EOF somewhere
+      LOG.debug("Got EOF while we want to read {} bytes from stream", readLen, 
e);
       return -1;
     }
-    pos += read;
-    return read;
+    return readLen;
   }
 
-  /**
-   * Call the delegate's {@code skip(long)} method.
-   * @param len the number of bytes to skip
-   * @return the actual number of bytes skipped
-   */
   @Override
   public long skip(final long len) throws IOException {
     long skipped = in.skip(Math.min(len, limit - pos));
@@ -91,10 +92,6 @@ public class BoundedDelegatingInputStream extends 
DelegatingInputStream {
     return skipped;
   }
 
-  /**
-   * @return the remaining bytes within the bound if the current position is 
less than the limit, or
-   *         0 otherwise.
-   */
   @Override
   public int available() throws IOException {
     if (pos >= limit) {
@@ -108,5 +105,4 @@ public class BoundedDelegatingInputStream extends 
DelegatingInputStream {
     // successful decompression depends on this behavior.
     return (int) (limit - pos);
   }
-
 }

Reply via email to