This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 7fb061c488d HBASE-29131 Introduce the option for post-compaction 
validation of HFiles (#6700)
7fb061c488d is described below

commit 7fb061c488d6108771b280abbd8e263a44492bc3
Author: Nick Dimiduk <[email protected]>
AuthorDate: Mon Feb 17 17:00:43 2025 +0100

    HBASE-29131 Introduce the option for post-compaction validation of HFiles 
(#6700)
    
    Introduces the option for an HStore to fully read the file it just wrote 
after a flush or
    compaction.
    
    To enable this feature, set `hbase.hstore.validate.read_fully=true`. This 
is an HStore
    configuration feature, so it can be enabled in hbase-site.xml, in the 
TableDescriptor, or in the
    ColumnFamilyDescriptor.
    
    Signed-off-by: Peter Somogyi <[email protected] >
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 130 +++++++++++++++------
 .../apache/hadoop/hbase/regionserver/HStore.java   |  44 ++++---
 .../hadoop/hbase/regionserver/StoreEngine.java     |  64 +++++++---
 .../hadoop/hbase/regionserver/TestCompaction.java  |  75 +++++++++++-
 .../TestCompaction_HFileWithCorruptBlock.gz        | Bin 0 -> 952 bytes
 5 files changed, 240 insertions(+), 73 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 95ac49ad51c..ca0e7db716d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2184,6 +2184,102 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return compact(compaction, store, throughputController, null);
   }
 
+  /**
+   * <p>
+   * We are trying to remove / relax the region read lock for compaction. 
Let's see what are the
+   * potential race conditions among the operations (user scan, region split, 
region close and
+   * region bulk load).
+   * </p>
+   *
+   * <pre>
+   *   user scan ---> region read lock
+   *   region split --> region close first --> region write lock
+   *   region close --> region write lock
+   *   region bulk load --> region write lock
+   * </pre>
+   * <p>
+   * read lock is compatible with read lock. ---> no problem with user 
scan/read region bulk load
+   * does not cause problem for compaction (no consistency problem, store lock 
will help the store
+   * file accounting). They can run almost concurrently at the region level.
+   * </p>
+   * <p>
+   * The only remaining race condition is between the region close and 
compaction. So we will
+   * evaluate, below, how region close intervenes with compaction if 
compaction does not acquire
+   * region read lock.
+   * </p>
+   * <p>
+   * Here are the steps for compaction:
+   * <ol>
+   * <li>obtain list of StoreFile's</li>
+   * <li>create StoreFileScanner's based on list from #1</li>
+   * <li>perform compaction and save resulting files under tmp dir</li>
+   * <li>swap in compacted files</li>
+   * </ol>
+   * </p>
+   * <p>
+   * #1 is guarded by store lock. This patch does not change this --> no worse 
or better For #2, we
+   * obtain smallest read point (for region) across all the Scanners (for both 
default compactor and
+   * stripe compactor). The read points are for user scans. Region keeps the 
read points for all
+   * currently open user scanners. Compaction needs to know the smallest read 
point so that during
+   * re-write of the hfiles, it can remove the mvcc points for the cells if 
their mvccs are older
+   * than the smallest since they are not needed anymore. This will not 
conflict with compaction.
+   * </p>
+   * <p>
+   * For #3, it can be performed in parallel to other operations.
+   * </p>
+   * <p>
+   * For #4 bulk load and compaction don't conflict with each other on the 
region level (for
+   * multi-family atomicy).
+   * </p>
+   * <p>
+   * Region close and compaction are guarded pretty well by the 'writestate'. 
In HRegion#doClose(),
+   * we have :
+   *
+   * <pre>
+   * synchronized (writestate) {
+   *   // Disable compacting and flushing by background threads for this
+   *   // region.
+   *   canFlush = !writestate.readOnly;
+   *   writestate.writesEnabled = false;
+   *   LOG.debug("Closing " + this + ": disabling compactions & flushes");
+   *   waitForFlushesAndCompactions();
+   * }
+   * </pre>
+   *
+   * {@code waitForFlushesAndCompactions()} would wait for {@code 
writestate.compacting} to come
+   * down to 0. and in {@code HRegion.compact()}
+   *
+   * <pre>
+   *   try {
+   *     synchronized (writestate) {
+   *       if (writestate.writesEnabled) {
+   *         wasStateSet = true;
+   *         ++writestate.compacting;
+   *       } else {
+   *         String msg = "NOT compacting region " + this + ". Writes 
disabled.";
+   *         LOG.info(msg);
+   *         status.abort(msg);
+   *         return false;
+   *       }
+   *     }
+   *   }
+   * </pre>
+   *
+   * Also in {@code compactor.performCompaction()}: check periodically to see 
if a system stop is
+   * requested
+   *
+   * <pre>
+   * if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
+   *   progress.cancel();
+   *   return false;
+   * }
+   * if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
+   *   progress.cancel();
+   *   return false;
+   * }
+   * </pre>
+   * </p>
+   */
   public boolean compact(CompactionContext compaction, HStore store,
     ThroughputController throughputController, User user) throws IOException {
     assert compaction != null && compaction.hasSelection();
@@ -2195,40 +2291,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
     MonitoredTask status = null;
     boolean requestNeedsCancellation = true;
-    /*
-     * We are trying to remove / relax the region read lock for compaction. 
Let's see what are the
-     * potential race conditions among the operations (user scan, region 
split, region close and
-     * region bulk load). user scan ---> region read lock region split --> 
region close first -->
-     * region write lock region close --> region write lock region bulk load 
--> region write lock
-     * read lock is compatible with read lock. ---> no problem with user 
scan/read region bulk load
-     * does not cause problem for compaction (no consistency problem, store 
lock will help the store
-     * file accounting). They can run almost concurrently at the region level. 
The only remaining
-     * race condition is between the region close and compaction. So we will 
evaluate, below, how
-     * region close intervenes with compaction if compaction does not acquire 
region read lock. Here
-     * are the steps for compaction: 1. obtain list of StoreFile's 2. create 
StoreFileScanner's
-     * based on list from #1 3. perform compaction and save resulting files 
under tmp dir 4. swap in
-     * compacted files #1 is guarded by store lock. This patch does not change 
this --> no worse or
-     * better For #2, we obtain smallest read point (for region) across all 
the Scanners (for both
-     * default compactor and stripe compactor). The read points are for user 
scans. Region keeps the
-     * read points for all currently open user scanners. Compaction needs to 
know the smallest read
-     * point so that during re-write of the hfiles, it can remove the mvcc 
points for the cells if
-     * their mvccs are older than the smallest since they are not needed 
anymore. This will not
-     * conflict with compaction. For #3, it can be performed in parallel to 
other operations. For #4
-     * bulk load and compaction don't conflict with each other on the region 
level (for multi-family
-     * atomicy). Region close and compaction are guarded pretty well by the 
'writestate'. In
-     * HRegion#doClose(), we have : synchronized (writestate) { // Disable 
compacting and flushing
-     * by background threads for this // region. canFlush = 
!writestate.readOnly;
-     * writestate.writesEnabled = false; LOG.debug("Closing " + this +
-     * ": disabling compactions & flushes"); waitForFlushesAndCompactions(); }
-     * waitForFlushesAndCompactions() would wait for writestate.compacting to 
come down to 0. and in
-     * HRegion.compact() try { synchronized (writestate) { if 
(writestate.writesEnabled) {
-     * wasStateSet = true; ++writestate.compacting; } else { String msg = "NOT 
compacting region " +
-     * this + ". Writes disabled."; LOG.info(msg); status.abort(msg); return 
false; } } Also in
-     * compactor.performCompaction(): check periodically to see if a system 
stop is requested if
-     * (closeChecker != null && closeChecker.isTimeLimit(store, now)) { 
progress.cancel(); return
-     * false; } if (closeChecker != null && closeChecker.isSizeLimit(store, 
len)) {
-     * progress.cancel(); return false; }
-     */
     try {
       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
       if (stores.get(cf) != store) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 250555fed3c..c3ad6ed52b1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -826,7 +826,7 @@ public class HStore
         try {
           for (Path pathName : pathNames) {
             lastPathName = pathName;
-            storeEngine.validateStoreFile(pathName);
+            storeEngine.validateStoreFile(pathName, false);
           }
           return pathNames;
         } catch (Exception e) {
@@ -1112,7 +1112,7 @@ public class HStore
    * block for long periods.
    * <p>
    * During this time, the Store can work as usual, getting values from 
StoreFiles and writing new
-   * StoreFiles from the memstore. Existing StoreFiles are not destroyed until 
the new compacted
+   * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until 
the new compacted
    * StoreFile is completely written-out to disk.
    * <p>
    * The compactLock prevents multiple simultaneous compactions. The 
structureLock prevents us from
@@ -1123,21 +1123,29 @@ public class HStore
    * <p>
    * Compaction event should be idempotent, since there is no IO Fencing for 
the region directory in
    * hdfs. A region server might still try to complete the compaction after it 
lost the region. That
-   * is why the following events are carefully ordered for a compaction: 1. 
Compaction writes new
-   * files under region/.tmp directory (compaction output) 2. Compaction 
atomically moves the
-   * temporary file under region directory 3. Compaction appends a WAL edit 
containing the
-   * compaction input and output files. Forces sync on WAL. 4. Compaction 
deletes the input files
-   * from the region directory. Failure conditions are handled like this: - If 
RS fails before 2,
-   * compaction wont complete. Even if RS lives on and finishes the compaction 
later, it will only
-   * write the new data file to the region directory. Since we already have 
this data, this will be
-   * idempotent but we will have a redundant copy of the data. - If RS fails 
between 2 and 3, the
-   * region will have a redundant copy of the data. The RS that failed won't 
be able to finish
-   * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, 
the region region
-   * server who opens the region will pick up the the compaction marker from 
the WAL and replay it
-   * by removing the compaction input files. Failed RS can also attempt to 
delete those files, but
-   * the operation will be idempotent See HBASE-2231 for details.
+   * is why the following events are carefully ordered for a compaction:
+   * <ol>
+   * <li>Compaction writes new files under region/.tmp directory (compaction 
output)</li>
+   * <li>Compaction atomically moves the temporary file under region 
directory</li>
+   * <li>Compaction appends a WAL edit containing the compaction input and 
output files. Forces sync
+   * on WAL.</li>
+   * <li>Compaction deletes the input files from the region directory.</li>
+   * </ol>
+   * Failure conditions are handled like this:
+   * <ul>
+   * <li>If RS fails before 2, compaction won't complete. Even if RS lives on 
and finishes the
+   * compaction later, it will only write the new data file to the region 
directory. Since we
+   * already have this data, this will be idempotent, but we will have a 
redundant copy of the
+   * data.</li>
+   * <li>If RS fails between 2 and 3, the region will have a redundant copy of 
the data. The RS that
+   * failed won't be able to finish sync() for WAL because of lease recovery 
in WAL.</li>
+   * <li>If RS fails after 3, the region server who opens the region will pick 
up the compaction
+   * marker from the WAL and replay it by removing the compaction input files. 
Failed RS can also
+   * attempt to delete those files, but the operation will be idempotent</li>
+   * </ul>
+   * See HBASE-2231 for details.
    * @param compaction compaction details obtained from requestCompaction()
-   * @return Storefile we compacted into or null if we failed or opted out 
early.
+   * @return The storefiles that we compacted into or null if we failed or 
opted out early.
    */
   public List<HStoreFile> compact(CompactionContext compaction,
     ThroughputController throughputController, User user) throws IOException {
@@ -1180,7 +1188,7 @@ public class HStore
     throws IOException {
     // Do the steps necessary to complete the compaction.
     setStoragePolicyFromFileName(newFiles);
-    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
+    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true);
     if (this.getCoprocessorHost() != null) {
       for (HStoreFile sf : sfs) {
         getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
@@ -1973,7 +1981,7 @@ public class HStore
           return false;
         }
         status.setStatus("Flushing " + this + ": reopening flushed file");
-        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, 
false);
+        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, 
false, false);
         for (HStoreFile sf : storeFiles) {
           StoreFileReader r = sf.getReader();
           if (LOG.isInfoEnabled()) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 5923befbc9d..8d81c90144f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -36,7 +36,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -95,6 +97,9 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP 
extends Compaction
 
   private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
 
+  private static final String READ_FULLY_ON_VALIDATE_KEY = 
"hbase.hstore.validate.read_fully";
+  private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
+
   protected SF storeFlusher;
   protected CP compactionPolicy;
   protected C compactor;
@@ -162,7 +167,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
   }
 
   /** Returns Store flusher to use. */
-  public StoreFlusher getStoreFlusher() {
+  StoreFlusher getStoreFlusher() {
     return this.storeFlusher;
   }
 
@@ -201,7 +206,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
     this.openStoreFileThreadPoolCreator = 
store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
     this.storeFileTracker = createStoreFileTracker(conf, store);
     assert compactor != null && compactionPolicy != null && storeFileManager 
!= null
-      && storeFlusher != null && storeFileTracker != null;
+      && storeFlusher != null;
   }
 
   /**
@@ -229,12 +234,34 @@ public abstract class StoreEngine<SF extends 
StoreFlusher, CP extends Compaction
   /**
    * Validates a store file by opening and closing it. In HFileV2 this should 
not be an expensive
    * operation.
-   * @param path the path to the store file
+   * @param path         the path to the store file
+   * @param isCompaction whether this is called from the context of a 
compaction
    */
-  public void validateStoreFile(Path path) throws IOException {
+  public void validateStoreFile(Path path, boolean isCompaction) throws 
IOException {
     HStoreFile storeFile = null;
     try {
       storeFile = createStoreFileAndReader(path);
+      if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, 
DEFAULT_READ_FULLY_ON_VALIDATE)) {
+        if (!storeFile.getFirstKey().isPresent()) {
+          LOG.debug("'{}=true' but storefile does not contain any data. 
skipping validation.",
+            READ_FULLY_ON_VALIDATE_KEY);
+          return;
+        }
+        LOG.debug("Validating the store file by reading the first cell from 
each block : {}", path);
+        StoreFileReader reader = storeFile.getReader();
+        try (StoreFileScanner scanner =
+          reader.getStoreFileScanner(false, false, isCompaction, 
Long.MAX_VALUE, 0, false)) {
+          boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
+          assert hasNext : "StoreFile contains no data";
+          for (Cell cell = scanner.next(); cell != null; cell = 
scanner.next()) {
+            Cell nextIndexedKey = scanner.getNextIndexedKey();
+            if (nextIndexedKey == null) {
+              break;
+            }
+            scanner.seek(nextIndexedKey);
+          }
+        }
+      }
     } catch (IOException e) {
       LOG.error("Failed to open store file : {}, keeping it in tmp location", 
path, e);
       throw e;
@@ -294,8 +321,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
     }
     if (ioe != null) {
       // close StoreFile readers
-      boolean evictOnClose =
-        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : 
true;
+      boolean evictOnClose = ctx.getCacheConf() == null || 
ctx.getCacheConf().shouldEvictOnClose();
       for (HStoreFile file : results) {
         try {
           if (file != null) {
@@ -315,10 +341,8 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
       for (HStoreFile storeFile : results) {
         if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
           LOG.warn("Clearing the compacted storefile {} from {}", storeFile, 
this);
-          storeFile.getReader()
-            .close(storeFile.getCacheConf() != null
-              ? storeFile.getCacheConf().shouldEvictOnClose()
-              : true);
+          storeFile.getReader().close(
+            storeFile.getCacheConf() == null || 
storeFile.getCacheConf().shouldEvictOnClose());
           filesToRemove.add(storeFile);
         }
       }
@@ -380,7 +404,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
       compactedFilesSet.put(sf.getFileInfo(), sf);
     }
 
-    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+    Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
     // Exclude the files that have already been compacted
     newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, 
currentFilesSet.keySet());
@@ -390,8 +414,8 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
       return;
     }
 
-    LOG.info("Refreshing store files for " + this + " files to add: " + 
toBeAddedFiles
-      + " files to remove: " + toBeRemovedFiles);
+    LOG.info("Refreshing store files for {} files to add: {} files to remove: 
{}", this,
+      toBeAddedFiles, toBeRemovedFiles);
 
     Set<HStoreFile> toBeRemovedStoreFiles = new 
HashSet<>(toBeRemovedFiles.size());
     for (StoreFileInfo sfi : toBeRemovedFiles) {
@@ -401,7 +425,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, 
CP extends Compaction
     // try to open the files
     List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
 
-    // propogate the file changes to the underlying store file manager
+    // propagate the file changes to the underlying store file manager
     replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
     }, () -> {
     }); // won't throw an exception
@@ -411,11 +435,13 @@ public abstract class StoreEngine<SF extends 
StoreFlusher, CP extends Compaction
    * Commit the given {@code files}.
    * <p/>
    * We will move the file into data directory, and open it.
-   * @param files    the files want to commit
-   * @param validate whether to validate the store files
+   * @param files        the files want to commit
+   * @param isCompaction whether this is called from the context of a 
compaction
+   * @param validate     whether to validate the store files
    * @return the committed store files
    */
-  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) 
throws IOException {
+  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean 
isCompaction, boolean validate)
+    throws IOException {
     List<HStoreFile> committedFiles = new ArrayList<>(files.size());
     HRegionFileSystem hfs = ctx.getRegionFileSystem();
     String familyName = ctx.getFamily().getNameAsString();
@@ -423,13 +449,13 @@ public abstract class StoreEngine<SF extends 
StoreFlusher, CP extends Compaction
     for (Path file : files) {
       try {
         if (validate) {
-          validateStoreFile(file);
+          validateStoreFile(file, isCompaction);
         }
         Path committedPath;
         // As we want to support writing to data directory directly, here we 
need to check whether
         // the store file is already in the right place
         if (file.getParent() != null && file.getParent().equals(storeDir)) {
-          // already in the right place, skip renmaing
+          // already in the right place, skip renaming
           committedPath = file;
         } else {
           // Write-out finished successfully, move into the right spot
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 383708cb711..9136bf961a0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -23,6 +23,12 @@ import static 
org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
 import static 
org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
 import static 
org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -35,13 +41,17 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -60,6 +70,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -75,6 +86,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -86,6 +98,7 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test compaction framework and common functions
@@ -114,8 +127,6 @@ public class TestCompaction {
 
   /** constructor */
   public TestCompaction() {
-    super();
-
     // Set cache flush size to 1MB
     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
     conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
@@ -142,6 +153,12 @@ public class TestCompaction {
       hcd.setMaxVersions(65536);
       this.htd.addFamily(hcd);
     }
+    if (name.getMethodName().equals("testCompactionWithCorruptBlock")) {
+      UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", 
true);
+      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+      hcd.setCompressionType(Compression.Algorithm.GZ);
+      this.htd.addFamily(hcd);
+    }
     this.r = UTIL.createLocalHRegion(htd, null, null);
   }
 
@@ -353,6 +370,7 @@ public class TestCompaction {
     try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, 
(short) 3, 1024L, null)) {
       stream.writeChars("CORRUPT FILE!!!!");
     }
+
     // The complete compaction should fail and the corrupt file should remain
     // in the 'tmp' directory;
     assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
@@ -360,6 +378,59 @@ public class TestCompaction {
     assertTrue(fs.exists(tmpPath));
   }
 
+  /**
+   * This test uses a hand-modified HFile, which is loaded in from the 
resources' path. That file
+   * was generated from the test support code in this class and then edited to 
corrupt the
+   * GZ-encoded block by zeroing-out the first two bytes of the GZip header, 
the "standard
+   * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not 
sure why, but it seems
+   * that in this test context we do not enforce CRC checksums. Thus, this 
corruption manifests in
+   * the Decompressor rather than in the reader when it loads the block bytes 
and compares vs. the
+   * header.
+   */
+  @Test
+  public void testCompactionWithCorruptBlock() throws Exception {
+    createStoreFile(r, Bytes.toString(FAMILY));
+    createStoreFile(r, Bytes.toString(FAMILY));
+    HStore store = r.getStore(FAMILY);
+
+    Collection<HStoreFile> storeFiles = store.getStorefiles();
+    DefaultCompactor tool = (DefaultCompactor) 
store.storeEngine.getCompactor();
+    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
+    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
+
+    // insert the hfile with a corrupted data block into the region's tmp 
directory, where
+    // compaction output is collected.
+    FileSystem fs = store.getFileSystem();
+    Path tmpPath = store.getRegionFileSystem().createTempName();
+    try (
+      InputStream inputStream =
+        
getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz");
+      GZIPInputStream gzipInputStream = new 
GZIPInputStream(Objects.requireNonNull(inputStream));
+      OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 
3, 1024L, null)) {
+      assertThat(gzipInputStream, notNullValue());
+      assertThat(outputStream, notNullValue());
+      IOUtils.copyBytes(gzipInputStream, outputStream, 512);
+    }
+    LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile 
to {}", tmpPath);
+
+    // The complete compaction should fail and the corrupt file should remain
+    // in the 'tmp' directory;
+    try {
+      store.doCompaction(request, storeFiles, null, 
EnvironmentEdgeManager.currentTime(),
+        Collections.singletonList(tmpPath));
+    } catch (IOException e) {
+      Throwable rootCause = e;
+      while (rootCause.getCause() != null) {
+        rootCause = rootCause.getCause();
+      }
+      assertThat(rootCause, allOf(instanceOf(IOException.class),
+        hasProperty("message", containsString("not a gzip file"))));
+      assertTrue(fs.exists(tmpPath));
+      return;
+    }
+    fail("Compaction should have failed due to corrupt block");
+  }
+
   /**
    * Create a custom compaction request and be sure that we can track it 
through the queue, knowing
    * when the compaction is completed.
diff --git 
a/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
 
b/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
new file mode 100644
index 00000000000..c93407b455c
Binary files /dev/null and 
b/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
 differ

Reply via email to