This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-28028 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28028 by this push: new 5e5c210bf85 HBASE-28028 Read all compressed bytes to a byte array before submitting them to decompressor 5e5c210bf85 is described below commit 5e5c210bf8553b9a9e09adc7aedf6a10fb5953cc Author: Duo Zhang <zhang...@apache.org> AuthorDate: Thu Aug 17 22:54:19 2023 +0800 HBASE-28028 Read all compressed bytes to a byte array before submitting them to decompressor --- .../hbase/regionserver/wal/CompressionContext.java | 8 +-- ...LDecompressionBoundedDelegatingInputStream.java | 78 ++++++++++------------ 2 files changed, 40 insertions(+), 46 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 73cf4821db0..8f6d1792954 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; @@ -77,7 +76,7 @@ public class CompressionContext { private final Compression.Algorithm algorithm; private Compressor compressor; private Decompressor decompressor; - private BoundedDelegatingInputStream lowerIn; + private WALDecompressionBoundedDelegatingInputStream lowerIn; private ByteArrayOutputStream lowerOut; private InputStream compressedIn; private OutputStream compressedOut; @@ -115,13 +114,11 @@ public class CompressionContext { // Create the input streams here the first time around. if (compressedIn == null) { - lowerIn = new BoundedDelegatingInputStream(in, inLength); + lowerIn = new WALDecompressionBoundedDelegatingInputStream(in); if (decompressor == null) { decompressor = algorithm.getDecompressor(); } compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); - } else { - lowerIn.setDelegate(in, inLength); } if (outLength == 0) { // The BufferedInputStream will return earlier and skip reading anything if outLength == 0, @@ -131,6 +128,7 @@ public class CompressionContext { // such as data loss when splitting wal or replicating wal. IOUtils.skipFully(in, inLength); } else { + lowerIn.resetLimit(inLength); IOUtils.readFully(compressedIn, outArray, outOffset, outLength); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java similarity index 57% rename from hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java index 2a6db09050c..688a5053c6e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,75 +15,76 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.io; +package org.apache.hadoop.hbase.regionserver.wal; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import org.apache.commons.io.IOUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is a stream that will only supply bytes from its delegate up to a certain limit. When there - * is an attempt to set the position beyond that it will signal that the input is finished. + * This class is only used by WAL ValueCompressor for decompression. + * <p> + * <strong>WARNING: </strong>The implementation is very tricky and does not follow typical + * InputStream pattern, so do not use it in any other places. */ @InterfaceAudience.Private -public class BoundedDelegatingInputStream extends DelegatingInputStream { +class WALDecompressionBoundedDelegatingInputStream extends InputStream { - protected long limit; - protected long pos; + private static final Logger LOG = + LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class); - public BoundedDelegatingInputStream(InputStream in, long limit) { - super(in); - this.limit = limit; - this.pos = 0; - } + private final InputStream in; + + private long pos; + + private long limit; - public void setDelegate(InputStream in, long limit) { + public WALDecompressionBoundedDelegatingInputStream(InputStream in) { this.in = in; + } + + public void resetLimit(long limit) { this.limit = limit; this.pos = 0; } - /** - * Call the delegate's {@code read()} method if the current position is less than the limit. - * @return the byte read or -1 if the end of stream or the limit has been reached. - */ @Override public int read() throws IOException { if (pos >= limit) { return -1; } int result = in.read(); + if (result < 0) { + return -1; + } pos++; return result; } - /** - * Call the delegate's {@code read(byte[], int, int)} method if the current position is less than - * the limit. - * @param b read buffer - * @param off Start offset - * @param len The number of bytes to read - * @return the number of bytes read or -1 if the end of stream or the limit has been reached. - */ @Override - public int read(final byte[] b, final int off, final int len) throws IOException { + public int read(byte[] b, int off, int len) throws IOException { if (pos >= limit) { return -1; } - long readLen = Math.min(len, limit - pos); - int read = in.read(b, off, (int) readLen); - if (read < 0) { + int readLen = (int) Math.min(len, limit - pos); + try { + IOUtils.readFully(in, b, off, readLen); + } catch (EOFException e) { + // This is trick here, we will always try to read enough bytes to fill the buffer passed in, + // or we reach the end of this compression block, if there are not enough bytes, we just + // return -1 to let the upper layer fail with EOF + // In WAL value decompression this is OK as if we can not read all the data, we will finally + // get an EOF somewhere + LOG.debug("Got EOF while we want to read {} bytes from stream", readLen, e); return -1; } - pos += read; - return read; + return readLen; } - /** - * Call the delegate's {@code skip(long)} method. - * @param len the number of bytes to skip - * @return the actual number of bytes skipped - */ @Override public long skip(final long len) throws IOException { long skipped = in.skip(Math.min(len, limit - pos)); @@ -91,10 +92,6 @@ public class BoundedDelegatingInputStream extends DelegatingInputStream { return skipped; } - /** - * @return the remaining bytes within the bound if the current position is less than the limit, or - * 0 otherwise. - */ @Override public int available() throws IOException { if (pos >= limit) { @@ -108,5 +105,4 @@ public class BoundedDelegatingInputStream extends DelegatingInputStream { // successful decompression depends on this behavior. return (int) (limit - pos); } - }