This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa47c5cd4a HBASE-27264 Add options to consider compressed size when 
delimiting blocks during hfile writes (#4675)
eaa47c5cd4a is described below

commit eaa47c5cd4a71c4a3f5b0e8a1c7fc7bd1acd75bf
Author: Wellington Ramos Chevreuil <wchevre...@apache.org>
AuthorDate: Mon Aug 15 22:35:35 2022 +0100

    HBASE-27264 Add options to consider compressed size when delimiting blocks 
during hfile writes (#4675)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Signed-off-by: Ankit Singhal <an...@apache.org>
---
 .../io/hfile/BlockCompressedSizePredicator.java    | 59 +++++++++++++++++
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 39 ++++++++++--
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  6 +-
 .../PreviousBlockCompressionRatePredicator.java    | 62 ++++++++++++++++++
 .../io/hfile/UncompressedBlockSizePredicator.java  | 49 ++++++++++++++
 .../hadoop/hbase/regionserver/TestHStoreFile.java  | 74 ++++++++++++++++++++++
 6 files changed, 283 insertions(+), 6 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java
new file mode 100644
index 00000000000..a90e04fe5ad
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Allows for defining different compression rate predicates on its 
implementing classes. Useful
+ * when compression is in place, and we want to define block size based on the 
compressed size,
+ * rather than the default behaviour that considers the uncompressed size 
only. Since we don't
+ * actually know the compressed size until we actual apply compression in the 
block byte buffer, we
+ * need to "predicate" this compression rate and minimize compression 
execution to avoid excessive
+ * resources usage. Different approaches for predicating the compressed block 
size can be defined by
+ * implementing classes. The <code>updateLatestBlockSizes</code> allows for 
updating uncompressed
+ * and compressed size values, and is called during block finishing (when we 
finally apply
+ * compression on the block data). Final block size predicate logic is 
implemented in
+ * <code>shouldFinishBlock</code>, which is called by the block writer once 
uncompressed size has
+ * reached the configured BLOCK size, and additional checks should be applied 
to decide if the block
+ * can be finished.
+ */
+@InterfaceAudience.Private
+public interface BlockCompressedSizePredicator {
+
+  String BLOCK_COMPRESSED_SIZE_PREDICATOR = 
"hbase.block.compressed.size.predicator";
+
+  String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";
+
+  /**
+   * Updates the predicator with both compressed and uncompressed sizes of 
latest block written. To
+   * be called once the block is finshed and flushed to disk after compression.
+   * @param context      the HFileContext containg the configured max block 
size.
+   * @param uncompressed the uncompressed size of last block written.
+   * @param compressed   the compressed size of last block written.
+   */
+  void updateLatestBlockSizes(HFileContext context, int uncompressed, int 
compressed);
+
+  /**
+   * Decides if the block should be finished based on the comparison of its 
uncompressed size
+   * against an adjusted size based on a predicated compression factor.
+   * @param uncompressed true if the block should be finished. n
+   */
+  boolean shouldFinishBlock(int uncompressed);
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index f68ffffa94a..8e04580874f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static 
org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
 import static 
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
 
 import io.opentelemetry.api.common.Attributes;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -463,7 +465,7 @@ public class HFileBlock implements Cacheable {
   }
 
   /** Returns the uncompressed size of data part (header and checksum 
excluded). */
-  int getUncompressedSizeWithoutHeader() {
+  public int getUncompressedSizeWithoutHeader() {
     return uncompressedSizeWithoutHeader;
   }
 
@@ -740,6 +742,10 @@ public class HFileBlock implements Cacheable {
       BLOCK_READY
     }
 
+    private int maxSizeUnCompressed;
+
+    private BlockCompressedSizePredicator compressedSizePredicator;
+
     /** Writer state. Used to ensure the correct usage protocol. */
     private State state = State.INIT;
 
@@ -818,11 +824,11 @@ public class HFileBlock implements Cacheable {
      */
     public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
       HFileContext fileContext) {
-      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
+      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, 
fileContext.getBlocksize());
     }
 
     public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
-      HFileContext fileContext, ByteBuffAllocator allocator) {
+      HFileContext fileContext, ByteBuffAllocator allocator, int 
maxSizeUnCompressed) {
       if (fileContext.getBytesPerChecksum() < 
HConstants.HFILEBLOCK_HEADER_SIZE) {
         throw new RuntimeException("Unsupported value of bytesPerChecksum. " + 
" Minimum is "
           + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
@@ -845,6 +851,10 @@ public class HFileBlock implements Cacheable {
       // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
       // defaultDataBlockEncoder?
       this.fileContext = fileContext;
+      this.compressedSizePredicator = (BlockCompressedSizePredicator) 
ReflectionUtils.newInstance(
+        conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, 
UncompressedBlockSizePredicator.class),
+        new Configuration(conf));
+      this.maxSizeUnCompressed = maxSizeUnCompressed;
     }
 
     /**
@@ -897,6 +907,15 @@ public class HFileBlock implements Cacheable {
       finishBlock();
     }
 
+    public boolean checkBoundariesWithPredicate() {
+      int rawBlockSize = encodedBlockSizeWritten();
+      if (rawBlockSize >= maxSizeUnCompressed) {
+        return true;
+      } else {
+        return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
+      }
+    }
+
     /**
      * Finish up writing of the block. Flushes the compressing stream (if 
using compression), fills
      * out the header, does any compression/encryption of bytes to flush out 
to disk, and manages
@@ -911,6 +930,11 @@ public class HFileBlock implements Cacheable {
       userDataStream.flush();
       prevOffset = prevOffsetByType[blockType.getId()];
 
+      // We need to cache the unencoded/uncompressed size before changing the 
block state
+      int rawBlockSize = 0;
+      if (this.getEncodingState() != null) {
+        rawBlockSize = blockSizeWritten();
+      }
       // We need to set state before we can package the block up for 
cache-on-write. In a way, the
       // block is ready, but not yet encoded or compressed.
       state = State.BLOCK_READY;
@@ -931,6 +955,10 @@ public class HFileBlock implements Cacheable {
       onDiskBlockBytesWithHeader.reset();
       onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
         compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
+      // Update raw and compressed sizes in the predicate
+      compressedSizePredicator.updateLatestBlockSizes(fileContext, 
rawBlockSize,
+        onDiskBlockBytesWithHeader.size());
+
       // Calculate how many bytes we need for checksum on the tail of the 
block.
       int numBytes = (int) 
ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
         fileContext.getBytesPerChecksum());
@@ -938,6 +966,7 @@ public class HFileBlock implements Cacheable {
       // Put the header for the on disk bytes; header currently is unfilled-out
       putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() 
+ numBytes,
         baosInMemory.size(), onDiskBlockBytesWithHeader.size());
+
       if (onDiskChecksum.length != numBytes) {
         onDiskChecksum = new byte[numBytes];
       }
@@ -1077,7 +1106,7 @@ public class HFileBlock implements Cacheable {
     /**
      * The uncompressed size of the block data, including header size.
      */
-    int getUncompressedSizeWithHeader() {
+    public int getUncompressedSizeWithHeader() {
       expectState(State.BLOCK_READY);
       return baosInMemory.size();
     }
@@ -1101,7 +1130,7 @@ public class HFileBlock implements Cacheable {
      * block at the moment. Note that this will return zero in the "block 
ready" state as well.
      * @return the number of bytes written
      */
-    int blockSizeWritten() {
+    public int blockSizeWritten() {
       return state != State.WRITING ? 0 : 
this.getEncodingState().getUnencodedDataSizeWritten();
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 80e333050c6..d58be5fd1ce 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static 
org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
+
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -292,7 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
       throw new IllegalStateException("finishInit called twice");
     }
     blockWriter =
-      new HFileBlock.Writer(conf, blockEncoder, hFileContext, 
cacheConf.getByteBuffAllocator());
+      new HFileBlock.Writer(conf, blockEncoder, hFileContext, 
cacheConf.getByteBuffAllocator(),
+        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 
10));
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@@ -319,6 +322,7 @@ public class HFileWriterImpl implements HFile.Writer {
       shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= 
hFileContext.getBlocksize()
         || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
     }
+    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
     if (shouldFinishBlock) {
       finishBlock();
       writeInlineBlocks(false);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java
new file mode 100644
index 00000000000..be0ee3bb9a7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This BlockCompressedSizePredicator implementation adjusts the block size 
limit based on the
+ * compression rate of the block contents read so far. For the first block, 
adjusted size would be
+ * zero, so it performs a compression of current block contents and calculate 
compression rate and
+ * adjusted size. For subsequent blocks, decision whether the block should be 
finished or not will
+ * be based on the compression rate calculated for the previous block.
+ */
+@InterfaceAudience.Private
+public class PreviousBlockCompressionRatePredicator implements 
BlockCompressedSizePredicator {
+
+  private int adjustedBlockSize;
+  private int compressionRatio = 1;
+  private int configuredMaxBlockSize;
+
+  /**
+   * Recalculates compression rate for the last block and adjusts the block 
size limit as:
+   * BLOCK_SIZE * (uncompressed/compressed).
+   * @param context      HFIleContext containing the configured max block size.
+   * @param uncompressed the uncompressed size of last block written.
+   * @param compressed   the compressed size of last block written.
+   */
+  @Override
+  public void updateLatestBlockSizes(HFileContext context, int uncompressed, 
int compressed) {
+    configuredMaxBlockSize = context.getBlocksize();
+    compressionRatio = uncompressed / compressed;
+    adjustedBlockSize = context.getBlocksize() * compressionRatio;
+  }
+
+  /**
+   * Returns <b>true</b> if the passed uncompressed size is larger than the 
limit calculated by
+   * <code>updateLatestBlockSizes</code>.
+   * @param uncompressed true if the block should be finished. n
+   */
+  @Override
+  public boolean shouldFinishBlock(int uncompressed) {
+    if (uncompressed >= configuredMaxBlockSize) {
+      return uncompressed >= adjustedBlockSize;
+    }
+    return false;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java
new file mode 100644
index 00000000000..c259375a97d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This BlockCompressedSizePredicator implementation doesn't actually performs 
any predicate and
+ * simply returns <b>true</b> on <code>shouldFinishBlock</code>. This is the 
default implementation
+ * if <b>hbase.block.compressed.size.predicator</b> property is not defined.
+ */
+@InterfaceAudience.Private
+public class UncompressedBlockSizePredicator implements 
BlockCompressedSizePredicator {
+
+  /**
+   * Empty implementation. Does nothing.
+   * @param uncompressed the uncompressed size of last block written.
+   * @param compressed   the compressed size of last block written.
+   */
+  @Override
+  public void updateLatestBlockSizes(HFileContext context, int uncompressed, 
int compressed) {
+  }
+
+  /**
+   * Dummy implementation that always returns true. This means, we will be 
only considering the
+   * block uncompressed size for deciding when to finish a block.
+   * @param uncompressed true if the block should be finished. n
+   */
+  @Override
+  public boolean shouldFinishBlock(int uncompressed) {
+    return true;
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index 7eff766c0b2..d71b33e82d5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static 
org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,6 +40,7 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
@@ -74,8 +77,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator;
 import org.apache.hadoop.hbase.io.hfile.ReaderContext;
 import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -189,6 +194,24 @@ public class TestHStoreFile {
     }
   }
 
+  public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] 
fam, byte[] qualifier,
+    int rounds) throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    try {
+      for (int i = 0; i < rounds; i++) {
+        for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
+          for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
+            byte[] b = new byte[] { (byte) d, (byte) e };
+            byte[] key = new byte[] { (byte) i };
+            writer.append(new KeyValue(key, fam, qualifier, now, b));
+          }
+        }
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
   /**
    * Test that our mechanism of writing store files in one region to reference 
store files in other
    * regions works.
@@ -1193,4 +1216,55 @@ public class TestHStoreFile {
     }
   }
 
+  @Test
+  public void testDataBlockSizeCompressed() throws Exception {
+    conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR,
+      PreviousBlockCompressionRatePredicator.class.getName());
+    testDataBlockSizeWithCompressionRatePredicator(11,
+      (s, c) -> (c > 1 && c < 11) ? s >= BLOCKSIZE_SMALL * 10 : true);
+  }
+
+  @Test
+  public void testDataBlockSizeUnCompressed() throws Exception {
+    conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, 
UncompressedBlockSizePredicator.class.getName());
+    testDataBlockSizeWithCompressionRatePredicator(200, (s, c) -> s < 
BLOCKSIZE_SMALL * 10);
+  }
+
+  private void testDataBlockSizeWithCompressionRatePredicator(int 
expectedBlockCount,
+    BiFunction<Integer, Integer, Boolean> validation) throws Exception {
+    Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
+    Path path = new Path(dir, "1234567890");
+    DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
+    cacheConf = new CacheConfig(conf);
+    HFileContext meta =
+      new 
HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
+        
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo)
+        .withCompression(Compression.Algorithm.GZ).build();
+    // Make a store file and write data to it.
+    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, 
this.fs)
+      .withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
+    writeLargeStoreFile(writer, Bytes.toBytes(name.getMethodName()),
+      Bytes.toBytes(name.getMethodName()), 200);
+    writer.close();
+    HStoreFile storeFile =
+      new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, 
true);
+    storeFile.initReader();
+    HFile.Reader fReader =
+      HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, 
conf);
+    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, 
writer.getPath());
+    long fileSize = fs.getFileStatus(writer.getPath()).getLen();
+    FixedFileTrailer trailer = 
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
+    long offset = trailer.getFirstDataBlockOffset(), max = 
trailer.getLastDataBlockOffset();
+    HFileBlock block;
+    int blockCount = 0;
+    while (offset <= max) {
+      block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread 
*/ false,
+        /* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
+      offset += block.getOnDiskSizeWithHeader();
+      blockCount++;
+      assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader(), 
blockCount));
+    }
+    assertEquals(expectedBlockCount, blockCount);
+  }
+
 }

Reply via email to