This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c4136bf6b HBASE-28065 Corrupt HFile data is mishandled in several 
cases
20c4136bf6b is described below

commit 20c4136bf6b842e8c5fc5010cce9fb76364c3d11
Author: Nick Dimiduk <ndimi...@apache.org>
AuthorDate: Thu Sep 21 12:06:39 2023 +0200

    HBASE-28065 Corrupt HFile data is mishandled in several cases
    
    * when no block size is provided and there's not a preread headerBuf, treat 
the value with
      caution.
    * verify HBase checksums before making use of the block header.
    * inline verifyOnDiskSizeMatchesHeader to keep throw/return logic in the 
method body.
    * separate validation of onDiskSizeWithHeader as input parameter from as 
read from block header
    * simplify branching around fetching and populating onDiskSizeWithHeader.
    * inline retrieving nextOnDiskBlockSize ; add basic validation.
    * whenever a read is determined to be corrupt and fallback to HDFS checksum 
is necessary, also
      invalidate the cached value of headerBuf.
    * build out a test suite covering various forms of block header corruption, 
for blocks in first
      and second positions.
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 154 ++++--
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |   2 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    |   7 +-
 .../io/hfile/TestHFileBlockHeaderCorruption.java   | 529 +++++++++++++++++++++
 4 files changed, 640 insertions(+), 52 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index b4bb2fb2c90..a3ead34730f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -392,12 +392,12 @@ public class HFileBlock implements Cacheable {
 
   /**
    * Parse total on disk size including header and checksum.
-   * @param headerBuf      Header ByteBuffer. Presumed exact size of header.
-   * @param verifyChecksum true if checksum verification is in use.
+   * @param headerBuf       Header ByteBuffer. Presumed exact size of header.
+   * @param checksumSupport true if checksum verification is in use.
    * @return Size of the block with header included.
    */
-  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean 
verifyChecksum) {
-    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + 
headerSize(verifyChecksum);
+  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean 
checksumSupport) {
+    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + 
headerSize(checksumSupport);
   }
 
   /**
@@ -1597,33 +1597,48 @@ public class HFileBlock implements Cacheable {
     }
 
     /**
-     * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and 
then return it as an int
+     * Check that {@code value} read from a block header seems reasonable, 
within a large margin of
+     * error.
+     * @return {@code true} if the value is safe to proceed, {@code false} 
otherwise.
      */
-    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, 
final int hdrSize)
-      throws IOException {
-      if (
-        (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
-          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE
-      ) {
-        throw new IOException(
-          "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at 
least " + hdrSize
-            + " and at most " + Integer.MAX_VALUE + ", or -1");
+    private boolean checkOnDiskSizeWithHeader(int value) {
+      if (value < 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+            "onDiskSizeWithHeader={}; value represents a size, so it should 
never be negative.",
+            value);
+        }
+        return false;
+      }
+      if (value - hdrSize < 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("onDiskSizeWithHeader={}, hdrSize={}; don't accept a value 
that is negative"
+            + " after the header size is excluded.", value, hdrSize);
+        }
+        return false;
       }
-      return (int) onDiskSizeWithHeaderL;
+      return true;
     }
 
     /**
-     * Verify the passed in onDiskSizeWithHeader aligns with what is in the 
header else something is
-     * not right.
+     * Check that {@code value} provided by the calling context seems 
reasonable, within a large
+     * margin of error.
+     * @return {@code true} if the value is safe to proceed, {@code false} 
otherwise.
      */
-    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final 
ByteBuff headerBuf,
-      final long offset, boolean verifyChecksum) throws IOException {
-      // Assert size provided aligns with what is in the header
-      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
-      if (passedIn != fromHeader) {
-        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " 
!= " + fromHeader
-          + ", offset=" + offset + ", fileContext=" + this.fileContext);
+    private boolean checkCallerProvidedOnDiskSizeWithHeader(long value) {
+      // same validation logic as is used by Math.toIntExact(long)
+      int intValue = (int) value;
+      if (intValue != value) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("onDiskSizeWithHeaderL={}; value exceeds int size 
limits.", value);
+        }
+        return false;
+      }
+      if (intValue == -1) {
+        // a magic value we expect to see.
+        return true;
       }
+      return checkOnDiskSizeWithHeader(intValue);
     }
 
     /**
@@ -1654,14 +1669,16 @@ public class HFileBlock implements Cacheable {
       this.prefetchedHeader.set(ph);
     }
 
-    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff 
onDiskBlock,
-      int onDiskSizeWithHeader) {
-      int nextBlockOnDiskSize = -1;
-      if (readNextHeader) {
-        nextBlockOnDiskSize =
-          onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + 
BlockType.MAGIC_LENGTH) + hdrSize;
-      }
-      return nextBlockOnDiskSize;
+    /**
+     * Clear the cached value when its integrity is suspect.
+     */
+    private void invalidateNextBlockHeader() {
+      prefetchedHeader.set(null);
+    }
+
+    private int getNextBlockOnDiskSize(ByteBuff onDiskBlock, int 
onDiskSizeWithHeader) {
+      return onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + 
BlockType.MAGIC_LENGTH)
+        + hdrSize;
     }
 
     private ByteBuff allocate(int size, boolean intoHeap) {
@@ -1687,17 +1704,21 @@ public class HFileBlock implements Cacheable {
     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long 
offset,
       long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, 
boolean updateMetrics,
       boolean intoHeap) throws IOException {
+      final Span span = Span.current();
+      final AttributesBuilder attributesBuilder = Attributes.builder();
+      Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
+        .ifPresent(c -> c.accept(attributesBuilder));
       if (offset < 0) {
         throw new IOException("Invalid offset=" + offset + " trying to read " 
+ "block (onDiskSize="
           + onDiskSizeWithHeaderL + ")");
       }
+      if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) {
+        LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", 
onDiskSizeWithHeaderL);
+        onDiskSizeWithHeaderL = -1;
+      }
+      int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
 
-      final Span span = Span.current();
-      final AttributesBuilder attributesBuilder = Attributes.builder();
-      Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
-        .ifPresent(c -> c.accept(attributesBuilder));
-      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, 
hdrSize);
-      // Try and get cached header. Will serve us in rare case where 
onDiskSizeWithHeaderL is -1
+      // Try to use the cached header. Will serve us in rare case where 
onDiskSizeWithHeaderL==-1
       // and will save us having to seek the stream backwards to reread the 
header we
       // read the last time through here.
       ByteBuff headerBuf = getCachedHeader(offset);
@@ -1711,8 +1732,8 @@ public class HFileBlock implements Cacheable {
       // file has support for checksums (version 2+).
       boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
       long startTime = EnvironmentEdgeManager.currentTime();
-      if (onDiskSizeWithHeader <= 0) {
-        // We were not passed the block size. Need to get it from the header. 
If header was
+      if (onDiskSizeWithHeader == -1) {
+        // The caller does not know the block size. Need to get it from the 
header. If header was
         // not cached (see getCachedHeader above), need to seek to pull it in. 
This is costly
         // and should happen very rarely. Currently happens on open of a hfile 
reader where we
         // read the trailer blocks to pull in the indices. Otherwise, we are 
reading block sizes
@@ -1729,6 +1750,19 @@ public class HFileBlock implements Cacheable {
         }
         onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, 
checksumSupport);
       }
+
+      // The common case is that onDiskSizeWithHeader was produced by a read 
without checksum
+      // validation, so give it a sanity check before trying to use it.
+      if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) {
+        if (verifyChecksum) {
+          invalidateNextBlockHeader();
+          span.addEvent("Falling back to HDFS checksumming.", 
attributesBuilder.build());
+          return null;
+        } else {
+          throw new IOException("Invalid onDiskSizeWithHeader=" + 
onDiskSizeWithHeader);
+        }
+      }
+
       int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
       // Allocate enough space to fit the next block's header too; saves a 
seek next time through.
       // onDiskBlock is whole block + header + checksums then extra hdrSize to 
read next header;
@@ -1745,19 +1779,49 @@ public class HFileBlock implements Cacheable {
         boolean readNextHeader = readAtOffset(is, onDiskBlock,
           onDiskSizeWithHeader - preReadHeaderSize, true, offset + 
preReadHeaderSize, pread);
         onDiskBlock.rewind(); // in case of moving position when copying a 
cached header
-        int nextBlockOnDiskSize =
-          getNextBlockOnDiskSize(readNextHeader, onDiskBlock, 
onDiskSizeWithHeader);
+
+        // the call to validateChecksum for this block excludes the next block 
header over-read, so
+        // no reason to delay extracting this value.
+        int nextBlockOnDiskSize = -1;
+        if (readNextHeader) {
+          int parsedVal = getNextBlockOnDiskSize(onDiskBlock, 
onDiskSizeWithHeader);
+          if (checkOnDiskSizeWithHeader(parsedVal)) {
+            nextBlockOnDiskSize = parsedVal;
+          }
+        }
         if (headerBuf == null) {
           headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
         }
-        // Do a few checks before we go instantiate HFileBlock.
-        assert onDiskSizeWithHeader > this.hdrSize;
-        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, 
checksumSupport);
+
         ByteBuff curBlock = 
onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
         // Verify checksum of the data before using it for building HFileBlock.
         if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
+          invalidateNextBlockHeader();
+          span.addEvent("Falling back to HDFS checksumming.", 
attributesBuilder.build());
           return null;
         }
+
+        // TODO: is this check necessary or can we proceed with a provided 
value regardless of
+        // what is in the header?
+        int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
+        if (onDiskSizeWithHeader != fromHeader) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, 
fileContext={}",
+              onDiskSizeWithHeader, fromHeader, offset, this.fileContext);
+          }
+          if (checksumSupport && verifyChecksum) {
+            // This file supports HBase checksums and verification of those 
checksums was
+            // requested. The block size provided by the caller (presumably 
from the block index)
+            // does not match the block size written to the block header. 
treat this as
+            // HBase-checksum failure.
+            span.addEvent("Falling back to HDFS checksumming.", 
attributesBuilder.build());
+            invalidateNextBlockHeader();
+            return null;
+          }
+          throw new IOException("Passed in onDiskSizeWithHeader=" + 
onDiskSizeWithHeader + " != "
+            + fromHeader + ", offset=" + offset + ", fileContext=" + 
this.fileContext);
+        }
+
         // remove checksum from buffer now that it's verified
         int sizeWithoutChecksum = 
curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
         curBlock.limit(sizeWithoutChecksum);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index fdd31fc4cf2..707a8b84c62 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -61,7 +61,7 @@ public class TestChecksum {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestChecksum.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlock.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestChecksum.class);
 
   static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index e3370802220..7624e219791 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -163,12 +163,7 @@ public class TestHFile {
     fillByteBuffAllocator(alloc, bufCount);
     // start write to store file.
     Path path = writeStoreFile();
-    try {
-      readStoreFile(path, conf, alloc);
-    } catch (Exception e) {
-      // fail test
-      assertTrue(false);
-    }
+    readStoreFile(path, conf, alloc);
     Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
     alloc.clean();
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java
new file mode 100644
index 00000000000..f74833a3b5e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test provides coverage for HFileHeader block fields that are read and 
interpreted before
+ * HBase checksum validation can be applied. As of now, this is just
+ * {@code onDiskSizeWithoutHeader}.
+ */
+@Category({ IOTests.class, SmallTests.class })
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtil(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws 
Exception {
+    HFileBlockChannelPosition firstBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        firstBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(firstBlock);
+
+      logHeader(firstBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IOException.class)
+            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IOException.class)
+            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+    } finally {
+      if (firstBlock != null) {
+        firstBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws 
Exception {
+    HFileBlockChannelPosition secondBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        it.next();
+        assertTrue(it.hasNext());
+        secondBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(secondBlock);
+
+      logHeader(secondBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IOException.class)
+            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertThat(e, new 
IsThrowableMatching().withInstanceOf(IOException.class)
+            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+    } finally {
+      if (secondBlock != null) {
+        secondBlock.close();
+      }
+    }
+  }
+
+  private static void logHeader(HFileBlockChannelPosition hbcp) throws 
IOException {
+    ByteBuff buf = 
ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
+    hbcp.rewind();
+    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
+    buf.rewind();
+    hbcp.rewind();
+    logHeader(buf);
+  }
+
+  private static void logHeader(ByteBuff buf) {
+    byte[] blockMagic = new byte[8];
+    buf.get(blockMagic);
+    int onDiskSizeWithoutHeader = buf.getInt();
+    int uncompressedSizeWithoutHeader = buf.getInt();
+    long prevBlockOffset = buf.getLong();
+    byte checksumType = buf.get();
+    int bytesPerChecksum = buf.getInt();
+    int onDiskDataSizeWithHeader = buf.getInt();
+    LOG.debug(
+      "blockMagic={}, onDiskSizeWithoutHeader={}, 
uncompressedSizeWithoutHeader={}, "
+        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, 
onDiskDataSizeWithHeader={}",
+      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, 
uncompressedSizeWithoutHeader,
+      prevBlockOffset, checksumType, bytesPerChecksum, 
onDiskDataSizeWithHeader);
+  }
+
+  /**
+   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
+   */
+  public static class HFileBlockChannelPosition implements Closeable {
+    private final SeekableByteChannel channel;
+    private final long position;
+
+    public HFileBlockChannelPosition(SeekableByteChannel channel, long 
position) {
+      this.channel = channel;
+      this.position = position;
+    }
+
+    public SeekableByteChannel getChannel() {
+      return channel;
+    }
+
+    public long getPosition() {
+      return position;
+    }
+
+    public void rewind() throws IOException {
+      channel.position(position);
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+  }
+
+  /**
+   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, 
counting them as it does.
+   */
+  public static class CountingConsumer {
+    private final HFileBlockChannelPositionIterator iterator;
+    private int itemsRead = 0;
+
+    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    public int getItemsRead() {
+      return itemsRead;
+    }
+
+    public Object readFully() throws IOException {
+      Object val = null;
+      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
+        val = iterator.next();
+      }
+      return val;
+    }
+  }
+
+  /**
+   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks 
a lot like an
+   * {@link java.util.Iterator}.
+   */
+  public static class HFileBlockChannelPositionIterator implements Closeable {
+
+    private final HFileTestRule hFileTestRule;
+    private final HFile.Reader reader;
+    private final HFileBlock.BlockIterator iter;
+    private HFileBlockChannelPosition current = null;
+
+    public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) 
throws IOException {
+      Configuration conf = hFileTestRule.getConfiguration();
+      HFileSystem hfs = hFileTestRule.getHFileSystem();
+      Path hfsPath = hFileTestRule.getPath();
+
+      HFile.Reader reader = null;
+      HFileBlock.BlockIterator iter = null;
+      try {
+        reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, 
conf);
+        HFileBlock.FSReader fsreader = reader.getUncachedBlockReader();
+        iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen());
+      } catch (IOException e) {
+        if (reader != null) {
+          closeQuietly(reader::close);
+        }
+        throw e;
+      }
+
+      this.hFileTestRule = hFileTestRule;
+      this.reader = reader;
+      this.iter = iter;
+    }
+
+    public boolean hasNext() throws IOException {
+      HFileBlock next = iter.nextBlock();
+      SeekableByteChannel channel = hFileTestRule.getRWChannel();
+      if (next != null) {
+        current = new HFileBlockChannelPosition(channel, next.getOffset());
+      }
+      return next != null;
+    }
+
+    public HFileBlockChannelPosition next() {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      HFileBlockChannelPosition ret = current;
+      current = null;
+      return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (current != null) {
+        closeQuietly(current::close);
+      }
+      closeQuietly(reader::close);
+    }
+
+    @FunctionalInterface
+    private interface CloseMethod {
+      void run() throws IOException;
+    }
+
+    private static void closeQuietly(CloseMethod closeMethod) {
+      try {
+        closeMethod.run();
+      } catch (Throwable e) {
+        LOG.debug("Ignoring thrown exception.", e);
+      }
+    }
+  }
+
+  /**
+   * Enables writing and rewriting portions of the file backing an {@link 
HFileBlock}.
+   */
+  public static class Corrupter {
+
+    private final HFileBlockChannelPosition channelAndPosition;
+    private final ByteBuffer originalHeader;
+
+    public Corrupter(HFileBlockChannelPosition channelAndPosition) throws 
IOException {
+      this.channelAndPosition = channelAndPosition;
+      this.originalHeader = readHeaderData(channelAndPosition);
+    }
+
+    private static ByteBuffer readHeaderData(HFileBlockChannelPosition 
channelAndPosition)
+      throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      ByteBuffer originalHeader = 
ByteBuffer.allocate(HFileBlock.headerSize(true));
+      channelAndPosition.rewind();
+      channel.read(originalHeader);
+      return originalHeader;
+    }
+
+    public void write(int offset, ByteBuffer src) throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      long position = channelAndPosition.getPosition();
+      channel.position(position + offset);
+      channel.write(src);
+    }
+
+    public void restore() throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      originalHeader.rewind();
+      channelAndPosition.rewind();
+      assertEquals(originalHeader.capacity(), channel.write(originalHeader));
+    }
+  }
+
+  public static class HFileTestRule extends ExternalResource {
+
+    private final HBaseTestingUtil testingUtility;
+    private final HFileSystem hfs;
+    private final HFileContext context;
+    private final TestName testName;
+    private Path path;
+
+    public HFileTestRule(HBaseTestingUtil testingUtility, TestName testName) 
throws IOException {
+      this.testingUtility = testingUtility;
+      this.testName = testName;
+      this.hfs = (HFileSystem) 
HFileSystem.get(testingUtility.getConfiguration());
+      this.context =
+        new HFileContextBuilder().withBlockSize(4 * 
1024).withHBaseCheckSum(true).build();
+    }
+
+    public Configuration getConfiguration() {
+      return testingUtility.getConfiguration();
+    }
+
+    public HFileSystem getHFileSystem() {
+      return hfs;
+    }
+
+    public HFileContext getHFileContext() {
+      return context;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    public SeekableByteChannel getRWChannel() throws IOException {
+      java.nio.file.Path p = FileSystems.getDefault().getPath(path.toString());
+      return Files.newByteChannel(p, StandardOpenOption.READ, 
StandardOpenOption.WRITE,
+        StandardOpenOption.DSYNC);
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      this.path = new Path(testingUtility.getDataTestDirOnTestFS(), 
testName.getMethodName());
+      HFile.WriterFactory factory =
+        HFile.getWriterFactory(testingUtility.getConfiguration(), 
CacheConfig.DISABLED)
+          .withPath(hfs, path).withFileContext(context);
+
+      CellBuilder cellBuilder = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+      Random rand = new Random(Instant.now().toEpochMilli());
+      byte[] family = Bytes.toBytes("f");
+      try (HFile.Writer writer = factory.create()) {
+        for (int i = 0; i < 40; i++) {
+          byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 
100);
+          byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
+          byte[] value = RandomKeyValueUtil.randomValue(rand);
+          Cell cell = 
cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family)
+            .setQualifier(qualifier).setValue(value).build();
+          writer.append(cell);
+          cellBuilder.clear();
+        }
+      }
+    }
+  }
+
+  /**
+   * A Matcher implementation that can make basic assertions over a provided 
{@link Throwable}.
+   * Assertion failures include the full stacktrace in their description.
+   */
+  private static final class IsThrowableMatching extends 
TypeSafeMatcher<Throwable> {
+
+    private final List<Matcher<? super Throwable>> requirements = new 
LinkedList<>();
+
+    public IsThrowableMatching withInstanceOf(Class<?> type) {
+      requirements.add(instanceOf(type));
+      return this;
+    }
+
+    public IsThrowableMatching withMessage(Matcher<String> matcher) {
+      requirements.add(hasProperty("message", matcher));
+      return this;
+    }
+
+    @Override
+    protected boolean matchesSafely(Throwable throwable) {
+      return allOf(requirements).matches(throwable);
+    }
+
+    @Override
+    protected void describeMismatchSafely(Throwable item, Description 
mismatchDescription) {
+      allOf(requirements).describeMismatch(item, mismatchDescription);
+      // would be nice if `item` could be provided as the cause of the 
AssertionError instead.
+      mismatchDescription.appendText(String.format("%nProvided: "));
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+        try (PrintStream ps = new PrintStream(baos, false, 
StandardCharsets.UTF_8.name())) {
+          item.printStackTrace(ps);
+          ps.flush();
+        }
+        
mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name()));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendDescriptionOf(allOf(requirements));
+    }
+  }
+}

Reply via email to