This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 636703bb7ea HBASE-29131 Introduce the option for post-compaction
validation of HFiles (#6700)
636703bb7ea is described below
commit 636703bb7ea46ee0d0d4436d88284cc66a01a8cc
Author: Nick Dimiduk <[email protected]>
AuthorDate: Mon Feb 17 17:00:43 2025 +0100
HBASE-29131 Introduce the option for post-compaction validation of HFiles
(#6700)
Introduces the option for an HStore to fully read the file it just wrote
after a flush or
compaction.
To enable this feature, set `hbase.hstore.validate.read_fully=true`. This
is an HStore
configuration feature, so it can be enabled in hbase-site.xml, in the
TableDescriptor, or in the
ColumnFamilyDescriptor.
Signed-off-by: Peter Somogyi <[email protected] >
---
.../apache/hadoop/hbase/regionserver/HRegion.java | 130 +++++++++++++++------
.../apache/hadoop/hbase/regionserver/HStore.java | 44 ++++---
.../hadoop/hbase/regionserver/StoreEngine.java | 64 +++++++---
.../hadoop/hbase/regionserver/TestCompaction.java | 75 +++++++++++-
.../TestCompaction_HFileWithCorruptBlock.gz | Bin 0 -> 952 bytes
5 files changed, 240 insertions(+), 73 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8db4dd331f2..782ee72a819 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2274,6 +2274,102 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
return compact(compaction, store, throughputController, null);
}
+ /**
+ * <p>
+ * We are trying to remove / relax the region read lock for compaction.
Let's see what are the
+ * potential race conditions among the operations (user scan, region split,
region close and
+ * region bulk load).
+ * </p>
+ *
+ * <pre>
+ * user scan ---> region read lock
+ * region split --> region close first --> region write lock
+ * region close --> region write lock
+ * region bulk load --> region write lock
+ * </pre>
+ * <p>
+ * read lock is compatible with read lock. ---> no problem with user
scan/read region bulk load
+ * does not cause problem for compaction (no consistency problem, store lock
will help the store
+ * file accounting). They can run almost concurrently at the region level.
+ * </p>
+ * <p>
+ * The only remaining race condition is between the region close and
compaction. So we will
+ * evaluate, below, how region close intervenes with compaction if
compaction does not acquire
+ * region read lock.
+ * </p>
+ * <p>
+ * Here are the steps for compaction:
+ * <ol>
+ * <li>obtain list of StoreFile's</li>
+ * <li>create StoreFileScanner's based on list from #1</li>
+ * <li>perform compaction and save resulting files under tmp dir</li>
+ * <li>swap in compacted files</li>
+ * </ol>
+ * </p>
+ * <p>
+ * #1 is guarded by store lock. This patch does not change this --> no worse
or better For #2, we
+ * obtain smallest read point (for region) across all the Scanners (for both
default compactor and
+ * stripe compactor). The read points are for user scans. Region keeps the
read points for all
+ * currently open user scanners. Compaction needs to know the smallest read
point so that during
+ * re-write of the hfiles, it can remove the mvcc points for the cells if
their mvccs are older
+ * than the smallest since they are not needed anymore. This will not
conflict with compaction.
+ * </p>
+ * <p>
+ * For #3, it can be performed in parallel to other operations.
+ * </p>
+ * <p>
+ * For #4 bulk load and compaction don't conflict with each other on the
region level (for
+ * multi-family atomicy).
+ * </p>
+ * <p>
+ * Region close and compaction are guarded pretty well by the 'writestate'.
In HRegion#doClose(),
+ * we have :
+ *
+ * <pre>
+ * synchronized (writestate) {
+ * // Disable compacting and flushing by background threads for this
+ * // region.
+ * canFlush = !writestate.readOnly;
+ * writestate.writesEnabled = false;
+ * LOG.debug("Closing " + this + ": disabling compactions & flushes");
+ * waitForFlushesAndCompactions();
+ * }
+ * </pre>
+ *
+ * {@code waitForFlushesAndCompactions()} would wait for {@code
writestate.compacting} to come
+ * down to 0. and in {@code HRegion.compact()}
+ *
+ * <pre>
+ * try {
+ * synchronized (writestate) {
+ * if (writestate.writesEnabled) {
+ * wasStateSet = true;
+ * ++writestate.compacting;
+ * } else {
+ * String msg = "NOT compacting region " + this + ". Writes
disabled.";
+ * LOG.info(msg);
+ * status.abort(msg);
+ * return false;
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * Also in {@code compactor.performCompaction()}: check periodically to see
if a system stop is
+ * requested
+ *
+ * <pre>
+ * if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
+ * progress.cancel();
+ * return false;
+ * }
+ * if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
+ * progress.cancel();
+ * return false;
+ * }
+ * </pre>
+ * </p>
+ */
public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException {
assert compaction != null && compaction.hasSelection();
@@ -2285,40 +2381,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
}
MonitoredTask status = null;
boolean requestNeedsCancellation = true;
- /*
- * We are trying to remove / relax the region read lock for compaction.
Let's see what are the
- * potential race conditions among the operations (user scan, region
split, region close and
- * region bulk load). user scan ---> region read lock region split -->
region close first -->
- * region write lock region close --> region write lock region bulk load
--> region write lock
- * read lock is compatible with read lock. ---> no problem with user
scan/read region bulk load
- * does not cause problem for compaction (no consistency problem, store
lock will help the store
- * file accounting). They can run almost concurrently at the region level.
The only remaining
- * race condition is between the region close and compaction. So we will
evaluate, below, how
- * region close intervenes with compaction if compaction does not acquire
region read lock. Here
- * are the steps for compaction: 1. obtain list of StoreFile's 2. create
StoreFileScanner's
- * based on list from #1 3. perform compaction and save resulting files
under tmp dir 4. swap in
- * compacted files #1 is guarded by store lock. This patch does not change
this --> no worse or
- * better For #2, we obtain smallest read point (for region) across all
the Scanners (for both
- * default compactor and stripe compactor). The read points are for user
scans. Region keeps the
- * read points for all currently open user scanners. Compaction needs to
know the smallest read
- * point so that during re-write of the hfiles, it can remove the mvcc
points for the cells if
- * their mvccs are older than the smallest since they are not needed
anymore. This will not
- * conflict with compaction. For #3, it can be performed in parallel to
other operations. For #4
- * bulk load and compaction don't conflict with each other on the region
level (for multi-family
- * atomicy). Region close and compaction are guarded pretty well by the
'writestate'. In
- * HRegion#doClose(), we have : synchronized (writestate) { // Disable
compacting and flushing
- * by background threads for this // region. canFlush =
!writestate.readOnly;
- * writestate.writesEnabled = false; LOG.debug("Closing " + this +
- * ": disabling compactions & flushes"); waitForFlushesAndCompactions(); }
- * waitForFlushesAndCompactions() would wait for writestate.compacting to
come down to 0. and in
- * HRegion.compact() try { synchronized (writestate) { if
(writestate.writesEnabled) {
- * wasStateSet = true; ++writestate.compacting; } else { String msg = "NOT
compacting region " +
- * this + ". Writes disabled."; LOG.info(msg); status.abort(msg); return
false; } } Also in
- * compactor.performCompaction(): check periodically to see if a system
stop is requested if
- * (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
progress.cancel(); return
- * false; } if (closeChecker != null && closeChecker.isSizeLimit(store,
len)) {
- * progress.cancel(); return false; }
- */
try {
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
if (stores.get(cf) != store) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c88e4afbe8e..5262cdac5fa 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -835,7 +835,7 @@ public class HStore
try {
for (Path pathName : pathNames) {
lastPathName = pathName;
- storeEngine.validateStoreFile(pathName);
+ storeEngine.validateStoreFile(pathName, false);
}
return pathNames;
} catch (Exception e) {
@@ -1121,7 +1121,7 @@ public class HStore
* block for long periods.
* <p>
* During this time, the Store can work as usual, getting values from
StoreFiles and writing new
- * StoreFiles from the memstore. Existing StoreFiles are not destroyed until
the new compacted
+ * StoreFiles from the MemStore. Existing StoreFiles are not destroyed until
the new compacted
* StoreFile is completely written-out to disk.
* <p>
* The compactLock prevents multiple simultaneous compactions. The
structureLock prevents us from
@@ -1132,21 +1132,29 @@ public class HStore
* <p>
* Compaction event should be idempotent, since there is no IO Fencing for
the region directory in
* hdfs. A region server might still try to complete the compaction after it
lost the region. That
- * is why the following events are carefully ordered for a compaction: 1.
Compaction writes new
- * files under region/.tmp directory (compaction output) 2. Compaction
atomically moves the
- * temporary file under region directory 3. Compaction appends a WAL edit
containing the
- * compaction input and output files. Forces sync on WAL. 4. Compaction
deletes the input files
- * from the region directory. Failure conditions are handled like this: - If
RS fails before 2,
- * compaction wont complete. Even if RS lives on and finishes the compaction
later, it will only
- * write the new data file to the region directory. Since we already have
this data, this will be
- * idempotent but we will have a redundant copy of the data. - If RS fails
between 2 and 3, the
- * region will have a redundant copy of the data. The RS that failed won't
be able to finish
- * sync() for WAL because of lease recovery in WAL. - If RS fails after 3,
the region region
- * server who opens the region will pick up the the compaction marker from
the WAL and replay it
- * by removing the compaction input files. Failed RS can also attempt to
delete those files, but
- * the operation will be idempotent See HBASE-2231 for details.
+ * is why the following events are carefully ordered for a compaction:
+ * <ol>
+ * <li>Compaction writes new files under region/.tmp directory (compaction
output)</li>
+ * <li>Compaction atomically moves the temporary file under region
directory</li>
+ * <li>Compaction appends a WAL edit containing the compaction input and
output files. Forces sync
+ * on WAL.</li>
+ * <li>Compaction deletes the input files from the region directory.</li>
+ * </ol>
+ * Failure conditions are handled like this:
+ * <ul>
+ * <li>If RS fails before 2, compaction won't complete. Even if RS lives on
and finishes the
+ * compaction later, it will only write the new data file to the region
directory. Since we
+ * already have this data, this will be idempotent, but we will have a
redundant copy of the
+ * data.</li>
+ * <li>If RS fails between 2 and 3, the region will have a redundant copy of
the data. The RS that
+ * failed won't be able to finish sync() for WAL because of lease recovery
in WAL.</li>
+ * <li>If RS fails after 3, the region server who opens the region will pick
up the compaction
+ * marker from the WAL and replay it by removing the compaction input files.
Failed RS can also
+ * attempt to delete those files, but the operation will be idempotent</li>
+ * </ul>
+ * See HBASE-2231 for details.
* @param compaction compaction details obtained from requestCompaction()
- * @return Storefile we compacted into or null if we failed or opted out
early.
+ * @return The storefiles that we compacted into or null if we failed or
opted out early.
*/
public List<HStoreFile> compact(CompactionContext compaction,
ThroughputController throughputController, User user) throws IOException {
@@ -1189,7 +1197,7 @@ public class HStore
throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
- List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
+ List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true, true);
if (this.getCoprocessorHost() != null) {
for (HStoreFile sf : sfs) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
@@ -1982,7 +1990,7 @@ public class HStore
return false;
}
status.setStatus("Flushing " + this + ": reopening flushed file");
- List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles,
false);
+ List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles,
false, false);
for (HStoreFile sf : storeFiles) {
StoreFileReader r = sf.getReader();
if (LOG.isInfoEnabled()) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 65b70e7ee13..7bd4cd336c8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -37,6 +37,8 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -95,6 +97,9 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP
extends Compaction
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
+ private static final String READ_FULLY_ON_VALIDATE_KEY =
"hbase.hstore.validate.read_fully";
+ private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
+
protected SF storeFlusher;
protected CP compactionPolicy;
protected C compactor;
@@ -162,7 +167,7 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
}
/** Returns Store flusher to use. */
- public StoreFlusher getStoreFlusher() {
+ StoreFlusher getStoreFlusher() {
return this.storeFlusher;
}
@@ -201,7 +206,7 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
this.openStoreFileThreadPoolCreator =
store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
this.storeFileTracker = createStoreFileTracker(conf, store);
assert compactor != null && compactionPolicy != null && storeFileManager
!= null
- && storeFlusher != null && storeFileTracker != null;
+ && storeFlusher != null;
}
/**
@@ -228,12 +233,34 @@ public abstract class StoreEngine<SF extends
StoreFlusher, CP extends Compaction
/**
* Validates a store file by opening and closing it. In HFileV2 this should
not be an expensive
* operation.
- * @param path the path to the store file
+ * @param path the path to the store file
+ * @param isCompaction whether this is called from the context of a
compaction
*/
- public void validateStoreFile(Path path) throws IOException {
+ public void validateStoreFile(Path path, boolean isCompaction) throws
IOException {
HStoreFile storeFile = null;
try {
storeFile = createStoreFileAndReader(path);
+ if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY,
DEFAULT_READ_FULLY_ON_VALIDATE)) {
+ if (!storeFile.getFirstKey().isPresent()) {
+ LOG.debug("'{}=true' but storefile does not contain any data.
skipping validation.",
+ READ_FULLY_ON_VALIDATE_KEY);
+ return;
+ }
+ LOG.debug("Validating the store file by reading the first cell from
each block : {}", path);
+ StoreFileReader reader = storeFile.getReader();
+ try (StoreFileScanner scanner =
+ reader.getStoreFileScanner(false, false, isCompaction,
Long.MAX_VALUE, 0, false)) {
+ boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
+ assert hasNext : "StoreFile contains no data";
+ for (ExtendedCell cell = scanner.next(); cell != null; cell =
scanner.next()) {
+ ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
+ if (nextIndexedKey == null) {
+ break;
+ }
+ scanner.seek(nextIndexedKey);
+ }
+ }
+ }
} catch (IOException e) {
LOG.error("Failed to open store file : {}, keeping it in tmp location",
path, e);
throw e;
@@ -293,8 +320,7 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
}
if (ioe != null) {
// close StoreFile readers
- boolean evictOnClose =
- ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() :
true;
+ boolean evictOnClose = ctx.getCacheConf() == null ||
ctx.getCacheConf().shouldEvictOnClose();
for (HStoreFile file : results) {
try {
if (file != null) {
@@ -314,10 +340,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
for (HStoreFile storeFile : results) {
if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
LOG.warn("Clearing the compacted storefile {} from {}", storeFile,
this);
- storeFile.getReader()
- .close(storeFile.getCacheConf() != null
- ? storeFile.getCacheConf().shouldEvictOnClose()
- : true);
+ storeFile.getReader().close(
+ storeFile.getCacheConf() == null ||
storeFile.getCacheConf().shouldEvictOnClose());
filesToRemove.add(storeFile);
}
}
@@ -379,7 +403,7 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
compactedFilesSet.put(sf.getFileInfo(), sf);
}
- Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+ Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
// Exclude the files that have already been compacted
newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet,
currentFilesSet.keySet());
@@ -389,8 +413,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
return;
}
- LOG.info("Refreshing store files for " + this + " files to add: " +
toBeAddedFiles
- + " files to remove: " + toBeRemovedFiles);
+ LOG.info("Refreshing store files for {} files to add: {} files to remove:
{}", this,
+ toBeAddedFiles, toBeRemovedFiles);
Set<HStoreFile> toBeRemovedStoreFiles = new
HashSet<>(toBeRemovedFiles.size());
for (StoreFileInfo sfi : toBeRemovedFiles) {
@@ -400,7 +424,7 @@ public abstract class StoreEngine<SF extends StoreFlusher,
CP extends Compaction
// try to open the files
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
- // propogate the file changes to the underlying store file manager
+ // propagate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
}, () -> {
}); // won't throw an exception
@@ -410,11 +434,13 @@ public abstract class StoreEngine<SF extends
StoreFlusher, CP extends Compaction
* Commit the given {@code files}.
* <p/>
* We will move the file into data directory, and open it.
- * @param files the files want to commit
- * @param validate whether to validate the store files
+ * @param files the files want to commit
+ * @param isCompaction whether this is called from the context of a
compaction
+ * @param validate whether to validate the store files
* @return the committed store files
*/
- public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate)
throws IOException {
+ public List<HStoreFile> commitStoreFiles(List<Path> files, boolean
isCompaction, boolean validate)
+ throws IOException {
List<HStoreFile> committedFiles = new ArrayList<>(files.size());
HRegionFileSystem hfs = ctx.getRegionFileSystem();
String familyName = ctx.getFamily().getNameAsString();
@@ -422,13 +448,13 @@ public abstract class StoreEngine<SF extends
StoreFlusher, CP extends Compaction
for (Path file : files) {
try {
if (validate) {
- validateStoreFile(file);
+ validateStoreFile(file, isCompaction);
}
Path committedPath;
// As we want to support writing to data directory directly, here we
need to check whether
// the store file is already in the right place
if (file.getParent() != null && file.getParent().equals(storeDir)) {
- // already in the right place, skip renmaing
+ // already in the right place, skip renaming
committedPath = file;
} else {
// Write-out finished successfully, move into the right spot
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 332ecd8a95a..86fe54c9815 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -23,6 +23,12 @@ import static
org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static
org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
import static
org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@@ -35,13 +41,17 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -60,6 +70,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -75,6 +86,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -86,6 +98,7 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
/**
* Test compaction framework and common functions
@@ -114,8 +127,6 @@ public class TestCompaction {
/** constructor */
public TestCompaction() {
- super();
-
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
@@ -143,6 +154,12 @@ public class TestCompaction {
hcd.setMaxVersions(65536);
this.htd.addFamily(hcd);
}
+ if (name.getMethodName().equals("testCompactionWithCorruptBlock")) {
+ UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully",
true);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ hcd.setCompressionType(Compression.Algorithm.GZ);
+ this.htd.addFamily(hcd);
+ }
this.r = UTIL.createLocalHRegion(htd, null, null);
}
@@ -354,6 +371,7 @@ public class TestCompaction {
try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512,
(short) 3, 1024L, null)) {
stream.writeChars("CORRUPT FILE!!!!");
}
+
// The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory;
assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
@@ -361,6 +379,59 @@ public class TestCompaction {
assertTrue(fs.exists(tmpPath));
}
+ /**
+ * This test uses a hand-modified HFile, which is loaded in from the
resources' path. That file
+ * was generated from the test support code in this class and then edited to
corrupt the
+ * GZ-encoded block by zeroing-out the first two bytes of the GZip header,
the "standard
+ * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not
sure why, but it seems
+ * that in this test context we do not enforce CRC checksums. Thus, this
corruption manifests in
+ * the Decompressor rather than in the reader when it loads the block bytes
and compares vs. the
+ * header.
+ */
+ @Test
+ public void testCompactionWithCorruptBlock() throws Exception {
+ createStoreFile(r, Bytes.toString(FAMILY));
+ createStoreFile(r, Bytes.toString(FAMILY));
+ HStore store = r.getStore(FAMILY);
+
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ DefaultCompactor tool = (DefaultCompactor)
store.storeEngine.getCompactor();
+ CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
+ tool.compact(request, NoLimitThroughputController.INSTANCE, null);
+
+ // insert the hfile with a corrupted data block into the region's tmp
directory, where
+ // compaction output is collected.
+ FileSystem fs = store.getFileSystem();
+ Path tmpPath = store.getRegionFileSystem().createTempName();
+ try (
+ InputStream inputStream =
+
getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz");
+ GZIPInputStream gzipInputStream = new
GZIPInputStream(Objects.requireNonNull(inputStream));
+ OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short)
3, 1024L, null)) {
+ assertThat(gzipInputStream, notNullValue());
+ assertThat(outputStream, notNullValue());
+ IOUtils.copyBytes(gzipInputStream, outputStream, 512);
+ }
+ LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile
to {}", tmpPath);
+
+ // The complete compaction should fail and the corrupt file should remain
+ // in the 'tmp' directory;
+ try {
+ store.doCompaction(request, storeFiles, null,
EnvironmentEdgeManager.currentTime(),
+ Collections.singletonList(tmpPath));
+ } catch (IOException e) {
+ Throwable rootCause = e;
+ while (rootCause.getCause() != null) {
+ rootCause = rootCause.getCause();
+ }
+ assertThat(rootCause, allOf(instanceOf(IOException.class),
+ hasProperty("message", containsString("not a gzip file"))));
+ assertTrue(fs.exists(tmpPath));
+ return;
+ }
+ fail("Compaction should have failed due to corrupt block");
+ }
+
/**
* Create a custom compaction request and be sure that we can track it
through the queue, knowing
* when the compaction is completed.
diff --git
a/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
b/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
new file mode 100644
index 00000000000..c93407b455c
Binary files /dev/null and
b/hbase-server/src/test/resources/org/apache/hadoop/hbase/regionserver/TestCompaction_HFileWithCorruptBlock.gz
differ