ndimiduk commented on code in PR #5384:
URL: https://github.com/apache/hbase/pull/5384#discussion_r1321481778


##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testBlockMagicCorruptionFirstBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition firstBlock = it.next()) {
+        Corrupter c = new Corrupter(firstBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(0, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testBlockMagicCorruptionSecondBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      it.next();
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition secondBlock = it.next()) {
+        Corrupter c = new Corrupter(secondBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(1, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws 
Exception {
+    HFileBlockChannelPosition firstBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        firstBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(firstBlock);
+
+      logHeader(firstBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+    } finally {
+      if (firstBlock != null) {
+        firstBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws 
Exception {
+    HFileBlockChannelPosition secondBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        it.next();
+        assertTrue(it.hasNext());
+        secondBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(secondBlock);
+
+      logHeader(secondBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+    } finally {
+      if (secondBlock != null) {
+        secondBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedSizeWithoutHeaderCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testPrevBlockOffsetCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testChecksumTypeCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testBytesPerChecksumCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testOnDiskDataSizeWithHeaderCorruption() throws Exception {
+
+  }
+
+  private static void logHeader(HFileBlockChannelPosition hbcp) throws 
IOException {
+    ByteBuff buf = 
ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
+    hbcp.rewind();
+    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
+    buf.rewind();
+    hbcp.rewind();
+    logHeader(buf);
+  }
+
+  private static void logHeader(ByteBuff buf) {
+    byte[] blockMagic = new byte[8];
+    buf.get(blockMagic);
+    int onDiskSizeWithoutHeader = buf.getInt();
+    int uncompressedSizeWithoutHeader = buf.getInt();
+    long prevBlockOffset = buf.getLong();
+    byte checksumType = buf.get();
+    int bytesPerChecksum = buf.getInt();
+    int onDiskDataSizeWithHeader = buf.getInt();
+    LOG.debug(
+      "blockMagic={}, onDiskSizeWithoutHeader={}, 
uncompressedSizeWithoutHeader={}, "
+        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, 
onDiskDataSizeWithHeader={}",
+      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, 
uncompressedSizeWithoutHeader,
+      prevBlockOffset, checksumType, bytesPerChecksum, 
onDiskDataSizeWithHeader);
+  }
+
+  /**
+   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
+   */
+  public static class HFileBlockChannelPosition implements Closeable {
+    private final SeekableByteChannel channel;
+    private final long position;
+
+    public HFileBlockChannelPosition(SeekableByteChannel channel, long 
position) {
+      this.channel = channel;
+      this.position = position;
+    }
+
+    public SeekableByteChannel getChannel() {
+      return channel;
+    }
+
+    public long getPosition() {
+      return position;
+    }
+
+    public void rewind() throws IOException {
+      channel.position(position);
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+  }
+
+  /**
+   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, 
counting them as it does.
+   */
+  public static class CountingConsumer {
+    private final HFileBlockChannelPositionIterator iterator;
+    private int itemsRead = 0;
+
+    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    public int getItemsRead() {
+      return itemsRead;
+    }
+
+    public Object readFully() throws IOException {
+      Object val = null;
+      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
+        val = iterator.next();
+      }
+      return val;
+    }
+  }
+
+  /**
+   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks 
a lot like an
+   * {@link java.util.Iterator}.
+   */
+  public static class HFileBlockChannelPositionIterator implements Closeable {
+
+    private final HFileTestRule hFileTestRule;
+    private final HFile.Reader reader;
+    private final HFileBlock.BlockIterator iter;
+    private HFileBlockChannelPosition current = null;
+
+    public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) 
throws IOException {
+      Configuration conf = hFileTestRule.getConfiguration();
+      HFileSystem hfs = hFileTestRule.getHFileSystem();
+      Path hfsPath = hFileTestRule.getPath();
+
+      HFile.Reader reader = null;
+      HFileBlock.BlockIterator iter = null;
+      try {
+        reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, 
conf);

Review Comment:
   It seems like there's a couple ways to read through an HFile -- this is 
roughly how the `HFilePrettyPrinter` does it. Is this how we want to do it in 
this test?



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testBlockMagicCorruptionFirstBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition firstBlock = it.next()) {
+        Corrupter c = new Corrupter(firstBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(0, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testBlockMagicCorruptionSecondBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      it.next();
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition secondBlock = it.next()) {
+        Corrupter c = new Corrupter(secondBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(1, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws 
Exception {
+    HFileBlockChannelPosition firstBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        firstBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(firstBlock);
+
+      logHeader(firstBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+    } finally {
+      if (firstBlock != null) {
+        firstBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws 
Exception {
+    HFileBlockChannelPosition secondBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        it.next();
+        assertTrue(it.hasNext());
+        secondBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(secondBlock);
+
+      logHeader(secondBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+    } finally {
+      if (secondBlock != null) {
+        secondBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedSizeWithoutHeaderCorruption() throws Exception {

Review Comment:
   I stubbed out test methods for all the Header fields, but, if we're careful, 
we don't need to test all of them because only a couple fields are read before 
the header checksum has been validated. Once the dust settles, I'll comment the 
test class with notes about which headers are tested and why, dropping the rest.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java:
##########
@@ -1667,7 +1688,11 @@ protected HFileBlock 
readBlockDataInternal(FSDataInputStream is, long offset,
       final AttributesBuilder attributesBuilder = Attributes.builder();
       Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
         .ifPresent(c -> c.accept(attributesBuilder));
-      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, 
hdrSize);
+      if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) {

Review Comment:
   In this patch I'm trying to make as targeted a change as I can, so as to 
confidently back-port it to all active branches. I was thinking I/we could 
follow-on with a more invasive restructure that would target only `master` and 
`branch-3`, maybe `branch-2` also.
   
   Let me see how things look with further consolidation of the value 
resolution logic.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java:
##########
@@ -1697,8 +1722,27 @@ protected HFileBlock 
readBlockDataInternal(FSDataInputStream is, long offset,
           headerBuf = HEAP.allocate(hdrSize);
           readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
           headerBuf.rewind();
+
+          // The caller didn't provide an anticipated block size and headerBuf 
was null, this is
+          // probably the first time this HDFS block has been read. The value 
we just read has not
+          // had HBase checksum validation ; assume it was not protected by 
HDFS checksum either.
+          // Sanity check the value. If it doesn't seem right, either trigger 
fall-back to hdfs
+          // checksum or abort the read.
+          //
+          // TODO: Should we also check the value vs. some multiple of 
fileContext.getBlocksize() ?
+          onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, 
checksumSupport);
+          if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) {
+            if (verifyChecksum) {
+              invalidateNextBlockHeader();
+              span.addEvent("Falling back to HDFS checksumming.", 
attributesBuilder.build());
+              return null;
+            } else {
+              throw new IOException("Read invalid onDiskSizeWithHeader=" + 
onDiskSizeWithHeader);
+            }
+          }
+        } else {
+          onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, 
checksumSupport);

Review Comment:
   > i think we can still possibly cache corrupt headers.
   
   I think you're correct.
   
   > im not sure we can checksum just the header
   
   We can, but only by reading most of the next block -- `bytesPerChecksum` is 
written out in every block header, but it's also in the `HFileContext`. 
Presumably we could over-read by some multiple of `bytesPerChecksum` sufficient 
to cover `headerSize`. But, checksums are after the data section, so we've have 
to over-read most of the next block in order to validate.
   
   > so i might suggest we add one more call to ...
   
   We can, but if it is suspect we have no way to hint early that the next read 
must fall back to HDFS checksums. At best, the next time we read a block, we'd 
end up right back here without the prefetched header, do the pre-fetch again, 
validate again, but this time be able to do something about it.
   
   Best to always assume a pre-fetched header is suspect.



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testBlockMagicCorruptionFirstBlock() throws Exception {

Review Comment:
   First and second block access patterns are slightly different because the 
caller has slightly more block location data due to the next-header over-read. 
Thus they have different failure modes ; thus they're tested in two different 
test methods.



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testBlockMagicCorruptionFirstBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition firstBlock = it.next()) {
+        Corrupter c = new Corrupter(firstBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(0, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testBlockMagicCorruptionSecondBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      it.next();
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition secondBlock = it.next()) {
+        Corrupter c = new Corrupter(secondBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(1, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws 
Exception {
+    HFileBlockChannelPosition firstBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        firstBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(firstBlock);
+
+      logHeader(firstBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+    } finally {
+      if (firstBlock != null) {
+        firstBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws 
Exception {
+    HFileBlockChannelPosition secondBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        it.next();
+        assertTrue(it.hasNext());
+        secondBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(secondBlock);
+
+      logHeader(secondBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+    } finally {
+      if (secondBlock != null) {
+        secondBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedSizeWithoutHeaderCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testPrevBlockOffsetCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testChecksumTypeCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testBytesPerChecksumCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testOnDiskDataSizeWithHeaderCorruption() throws Exception {
+
+  }
+
+  private static void logHeader(HFileBlockChannelPosition hbcp) throws 
IOException {
+    ByteBuff buf = 
ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
+    hbcp.rewind();
+    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
+    buf.rewind();
+    hbcp.rewind();
+    logHeader(buf);
+  }
+
+  private static void logHeader(ByteBuff buf) {
+    byte[] blockMagic = new byte[8];
+    buf.get(blockMagic);
+    int onDiskSizeWithoutHeader = buf.getInt();
+    int uncompressedSizeWithoutHeader = buf.getInt();
+    long prevBlockOffset = buf.getLong();
+    byte checksumType = buf.get();
+    int bytesPerChecksum = buf.getInt();
+    int onDiskDataSizeWithHeader = buf.getInt();
+    LOG.debug(
+      "blockMagic={}, onDiskSizeWithoutHeader={}, 
uncompressedSizeWithoutHeader={}, "
+        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, 
onDiskDataSizeWithHeader={}",
+      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, 
uncompressedSizeWithoutHeader,
+      prevBlockOffset, checksumType, bytesPerChecksum, 
onDiskDataSizeWithHeader);
+  }
+
+  /**
+   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
+   */
+  public static class HFileBlockChannelPosition implements Closeable {
+    private final SeekableByteChannel channel;
+    private final long position;
+
+    public HFileBlockChannelPosition(SeekableByteChannel channel, long 
position) {
+      this.channel = channel;
+      this.position = position;
+    }
+
+    public SeekableByteChannel getChannel() {
+      return channel;
+    }
+
+    public long getPosition() {
+      return position;
+    }
+
+    public void rewind() throws IOException {
+      channel.position(position);
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+  }
+
+  /**
+   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, 
counting them as it does.
+   */
+  public static class CountingConsumer {
+    private final HFileBlockChannelPositionIterator iterator;
+    private int itemsRead = 0;
+
+    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    public int getItemsRead() {
+      return itemsRead;
+    }
+
+    public Object readFully() throws IOException {
+      Object val = null;
+      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
+        val = iterator.next();
+      }
+      return val;
+    }
+  }
+
+  /**
+   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks 
a lot like an
+   * {@link java.util.Iterator}.
+   */
+  public static class HFileBlockChannelPositionIterator implements Closeable {
+
+    private final HFileTestRule hFileTestRule;
+    private final HFile.Reader reader;
+    private final HFileBlock.BlockIterator iter;
+    private HFileBlockChannelPosition current = null;
+
+    public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) 
throws IOException {
+      Configuration conf = hFileTestRule.getConfiguration();
+      HFileSystem hfs = hFileTestRule.getHFileSystem();
+      Path hfsPath = hFileTestRule.getPath();
+
+      HFile.Reader reader = null;
+      HFileBlock.BlockIterator iter = null;
+      try {
+        reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, 
conf);
+        HFileBlock.FSReader fsreader = reader.getUncachedBlockReader();
+        iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen());
+      } catch (IOException e) {
+        if (reader != null) {
+          closeQuietly(reader::close);
+        }
+        throw e;
+      }
+
+      this.hFileTestRule = hFileTestRule;
+      this.reader = reader;
+      this.iter = iter;
+    }
+
+    public boolean hasNext() throws IOException {
+      HFileBlock next = iter.nextBlock();
+      SeekableByteChannel channel = hFileTestRule.getRWChannel();
+      if (next != null) {
+        current = new HFileBlockChannelPosition(channel, next.getOffset());
+      }
+      return next != null;
+    }
+
+    public HFileBlockChannelPosition next() {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      HFileBlockChannelPosition ret = current;
+      current = null;
+      return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (current != null) {
+        closeQuietly(current::close);
+      }
+      closeQuietly(reader::close);
+    }
+
+    @FunctionalInterface
+    private interface CloseMethod {
+      void run() throws IOException;
+    }
+
+    private static <T extends Throwable> void closeQuietly(CloseMethod 
closeMethod) {
+      try {
+        closeMethod.run();
+      } catch (Throwable e) {
+        LOG.debug("Ignoring thrown exception.", e);
+      }
+    }
+  }
+
+  /**
+   * Enables writing and rewriting portions of the file backing an {@link 
HFileBlock}.

Review Comment:
   This implementation assumes a `LocalFileSystem`. I haven't yet looked at how 
to corrupt a header underneath a data node.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java:
##########
@@ -1721,14 +1765,36 @@ protected HFileBlock 
readBlockDataInternal(FSDataInputStream is, long offset,
         if (headerBuf == null) {
           headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
         }
-        // Do a few checks before we go instantiate HFileBlock.
-        assert onDiskSizeWithHeader > this.hdrSize;
-        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, 
checksumSupport);
+
         ByteBuff curBlock = 
onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
         // Verify checksum of the data before using it for building HFileBlock.
         if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
+          invalidateNextBlockHeader();
+          span.addEvent("Falling back to HDFS checksumming.", 
attributesBuilder.build());
           return null;
         }
+
+        // TODO: is this check necessary or can we proceed with a provided 
value regardless of

Review Comment:
   Tracing the call sites, I see four sources of this parameter.
   
   1. Caller doesn't know how big the block is, so it provides `-1`.
   2. Caller is seeking to this block via a block index reference, provides an 
expected block size based on the value that was written to the index. I think 
those reads are covered by checksum validation, we we can trust this source.
   3. In the previous call, the `readBlockDataInternal` method over-read the 
next block's header and takes a small detour to [read 
out](https://github.com/apache/hbase/blob/4c1fd0d/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java#L1719-L1720)
 and save off the value as a local variable. This value is not validated after 
the buffer read. Looks like something else that I should invalidate in 
`invalidateNextBlockHeader()`. We should be skeptical of values with this 
origin story.
   4. Call has read the previous block out of blockcache and so has a hint for 
the next block's size as provided by the cached previous block. This seems to 
only happen when we're working with an off-heap BlockCache, like BucketCache or 
Memcached. See references to `HFileBlock.BLOCK_METADATA_SPACE` and the 
`Cacheable` interface. The value seems to originate from the same 
next-block-header over-read that populates (3), and so we should be skeptical 
of these values as well.
   
   So in today's code, I'd say we don't trust this value very often -- what 
percentage of block reads are directly driven by a reference from the block 
index? Maybe after this patch, with more explicit care to invalidate the next 
header buffer when things look bad, and if we can find a way to mark the 
value's origin as the checksum-validated read of the block index, probably we 
trust it a lot, though it'll be absent more often. As of now, I think the 
common case is the value comes from the pre-read of the next block's header, 
which means we're just comparing two values that have identical origin and it's 
a wast of CPU.
   
   Also, we rely on this un-verified value in order to perform checksum 
validation. Read down into 
[ChecksumUtil](https://github.com/apache/hbase/blob/1aea663c6de4c08f0b2a2d4b2ca788772dc0b686/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java#L172-L178),
 it looks like we again read values from the unverified block header in order 
to perform checksum validation over data that includes the header value we're 
referencing.



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockHeaderCorruption.java:
##########
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHFileBlockHeaderCorruption {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
+
+  private final HFileTestRule hFileTestRule;
+
+  @Rule
+  public final RuleChain ruleChain;
+
+  public TestHFileBlockHeaderCorruption() throws IOException {
+    TestName testName = new TestName();
+    hFileTestRule = new HFileTestRule(new HBaseTestingUtility(), testName);
+    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
+  }
+
+  @Test
+  public void testBlockMagicCorruptionFirstBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition firstBlock = it.next()) {
+        Corrupter c = new Corrupter(firstBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(0, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testBlockMagicCorruptionSecondBlock() throws Exception {
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      assertTrue(it.hasNext());
+      it.next();
+      assertTrue(it.hasNext());
+      try (HFileBlockChannelPosition secondBlock = it.next()) {
+        Corrupter c = new Corrupter(secondBlock);
+        c.write(HFileBlock.Header.BLOCK_MAGIC_INDEX, 
ByteBuffer.wrap(Bytes.toBytes("_GARBAGE")));
+      }
+    }
+    try (
+      HFileBlockChannelPositionIterator it = new 
HFileBlockChannelPositionIterator(hFileTestRule)) {
+      CountingConsumer consumer = new CountingConsumer(it);
+      try {
+        consumer.readFully();
+        fail();
+      } catch (Exception e) {
+        assertTrue(e instanceof IOException);
+        assertEquals("Invalid HFile block magic: _GARBAGE", e.getMessage());
+      }
+      assertEquals(1, consumer.getItemsRead());
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws 
Exception {
+    HFileBlockChannelPosition firstBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        firstBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(firstBlock);
+
+      logHeader(firstBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(firstBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Read invalid 
onDiskSizeWithHeader="));
+        }
+        assertEquals(0, consumer.getItemsRead());
+      }
+    } finally {
+      if (firstBlock != null) {
+        firstBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws 
Exception {
+    HFileBlockChannelPosition secondBlock = null;
+    try {
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        assertTrue(it.hasNext());
+        it.next();
+        assertTrue(it.hasNext());
+        secondBlock = it.next();
+      }
+
+      Corrupter c = new Corrupter(secondBlock);
+
+      logHeader(secondBlock);
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(0)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IllegalArgumentException);
+          assertTrue(e.getMessage().startsWith("newLimit > capacity"));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+
+      c.restore();
+      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
+        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
+      logHeader(secondBlock);
+      try (HFileBlockChannelPositionIterator it =
+        new HFileBlockChannelPositionIterator(hFileTestRule)) {
+        CountingConsumer consumer = new CountingConsumer(it);
+        try {
+          consumer.readFully();
+          fail();
+        } catch (Exception e) {
+          assertTrue(e instanceof IOException);
+          assertTrue(e.getMessage().startsWith("Caller provided invalid 
onDiskSizeWithHeaderL="));
+        }
+        assertEquals(1, consumer.getItemsRead());
+      }
+    } finally {
+      if (secondBlock != null) {
+        secondBlock.close();
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedSizeWithoutHeaderCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testPrevBlockOffsetCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testChecksumTypeCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testBytesPerChecksumCorruption() throws Exception {
+
+  }
+
+  @Test
+  public void testOnDiskDataSizeWithHeaderCorruption() throws Exception {
+
+  }
+
+  private static void logHeader(HFileBlockChannelPosition hbcp) throws 
IOException {
+    ByteBuff buf = 
ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
+    hbcp.rewind();
+    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
+    buf.rewind();
+    hbcp.rewind();
+    logHeader(buf);
+  }
+
+  private static void logHeader(ByteBuff buf) {
+    byte[] blockMagic = new byte[8];
+    buf.get(blockMagic);
+    int onDiskSizeWithoutHeader = buf.getInt();
+    int uncompressedSizeWithoutHeader = buf.getInt();
+    long prevBlockOffset = buf.getLong();
+    byte checksumType = buf.get();
+    int bytesPerChecksum = buf.getInt();
+    int onDiskDataSizeWithHeader = buf.getInt();
+    LOG.debug(
+      "blockMagic={}, onDiskSizeWithoutHeader={}, 
uncompressedSizeWithoutHeader={}, "
+        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, 
onDiskDataSizeWithHeader={}",
+      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, 
uncompressedSizeWithoutHeader,
+      prevBlockOffset, checksumType, bytesPerChecksum, 
onDiskDataSizeWithHeader);
+  }
+
+  /**
+   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
+   */
+  public static class HFileBlockChannelPosition implements Closeable {
+    private final SeekableByteChannel channel;
+    private final long position;
+
+    public HFileBlockChannelPosition(SeekableByteChannel channel, long 
position) {
+      this.channel = channel;
+      this.position = position;
+    }
+
+    public SeekableByteChannel getChannel() {
+      return channel;
+    }
+
+    public long getPosition() {
+      return position;
+    }
+
+    public void rewind() throws IOException {
+      channel.position(position);
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+  }
+
+  /**
+   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, 
counting them as it does.
+   */
+  public static class CountingConsumer {
+    private final HFileBlockChannelPositionIterator iterator;
+    private int itemsRead = 0;
+
+    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    public int getItemsRead() {
+      return itemsRead;
+    }
+
+    public Object readFully() throws IOException {
+      Object val = null;
+      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
+        val = iterator.next();
+      }
+      return val;
+    }
+  }
+
+  /**
+   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks 
a lot like an
+   * {@link java.util.Iterator}.
+   */
+  public static class HFileBlockChannelPositionIterator implements Closeable {
+
+    private final HFileTestRule hFileTestRule;
+    private final HFile.Reader reader;
+    private final HFileBlock.BlockIterator iter;
+    private HFileBlockChannelPosition current = null;
+
+    public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) 
throws IOException {
+      Configuration conf = hFileTestRule.getConfiguration();
+      HFileSystem hfs = hFileTestRule.getHFileSystem();
+      Path hfsPath = hFileTestRule.getPath();
+
+      HFile.Reader reader = null;
+      HFileBlock.BlockIterator iter = null;
+      try {
+        reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, 
conf);
+        HFileBlock.FSReader fsreader = reader.getUncachedBlockReader();
+        iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen());
+      } catch (IOException e) {
+        if (reader != null) {
+          closeQuietly(reader::close);
+        }
+        throw e;
+      }
+
+      this.hFileTestRule = hFileTestRule;
+      this.reader = reader;
+      this.iter = iter;
+    }
+
+    public boolean hasNext() throws IOException {
+      HFileBlock next = iter.nextBlock();
+      SeekableByteChannel channel = hFileTestRule.getRWChannel();
+      if (next != null) {
+        current = new HFileBlockChannelPosition(channel, next.getOffset());
+      }
+      return next != null;
+    }
+
+    public HFileBlockChannelPosition next() {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      HFileBlockChannelPosition ret = current;
+      current = null;
+      return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (current != null) {
+        closeQuietly(current::close);
+      }
+      closeQuietly(reader::close);
+    }
+
+    @FunctionalInterface
+    private interface CloseMethod {
+      void run() throws IOException;
+    }
+
+    private static <T extends Throwable> void closeQuietly(CloseMethod 
closeMethod) {
+      try {
+        closeMethod.run();
+      } catch (Throwable e) {
+        LOG.debug("Ignoring thrown exception.", e);
+      }
+    }
+  }
+
+  /**
+   * Enables writing and rewriting portions of the file backing an {@link 
HFileBlock}.
+   */
+  public static class Corrupter {
+
+    private final HFileBlockChannelPosition channelAndPosition;
+    private final ByteBuffer originalHeader;
+
+    public Corrupter(HFileBlockChannelPosition channelAndPosition) throws 
IOException {
+      this.channelAndPosition = channelAndPosition;
+      this.originalHeader = readHeaderData(channelAndPosition);
+    }
+
+    private static ByteBuffer readHeaderData(HFileBlockChannelPosition 
channelAndPosition)
+      throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      ByteBuffer originalHeader = 
ByteBuffer.allocate(HFileBlock.headerSize(true));
+      channelAndPosition.rewind();
+      channel.read(originalHeader);
+      return originalHeader;
+    }
+
+    public void write(int offset, ByteBuffer src) throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      long position = channelAndPosition.getPosition();
+      channel.position(position + offset);
+      channel.write(src);
+    }
+
+    public void restore() throws IOException {
+      SeekableByteChannel channel = channelAndPosition.getChannel();
+      originalHeader.rewind();
+      channelAndPosition.rewind();
+      assertEquals(originalHeader.capacity(), channel.write(originalHeader));
+    }
+  }
+
+  public static class HFileTestRule extends ExternalResource {
+
+    private final HBaseTestingUtility testingUtility;
+    private final HFileSystem hfs;
+    private final HFileContext context;
+    private final TestName testName;
+    private Path path;
+
+    public HFileTestRule(HBaseTestingUtility testingUtility, TestName 
testName) throws IOException {
+      this.testingUtility = testingUtility;
+      this.testName = testName;
+      this.hfs = (HFileSystem) 
HFileSystem.get(testingUtility.getConfiguration());
+      this.context =
+        new HFileContextBuilder().withBlockSize(4 * 
1024).withHBaseCheckSum(true).build();
+    }
+
+    public Configuration getConfiguration() {
+      return testingUtility.getConfiguration();
+    }
+
+    public HFileSystem getHFileSystem() {
+      return hfs;
+    }
+
+    public HFileContext getHFileContext() {
+      return context;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    public SeekableByteChannel getRWChannel() throws IOException {
+      java.nio.file.Path p = FileSystems.getDefault().getPath(path.toString());
+      return Files.newByteChannel(p, StandardOpenOption.READ, 
StandardOpenOption.WRITE,
+        StandardOpenOption.DSYNC);
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      this.path = new Path(testingUtility.getDataTestDirOnTestFS(), 
testName.getMethodName());
+      HFile.WriterFactory factory =
+        HFile.getWriterFactory(testingUtility.getConfiguration(), 
CacheConfig.DISABLED)
+          .withPath(hfs, path).withFileContext(context);
+
+      CellBuilder cellBuilder = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+      Random rand = new Random(Instant.now().toEpochMilli());
+      byte[] family = Bytes.toBytes("f");
+      try (HFile.Writer writer = factory.create()) {
+        for (int i = 0; i < 40; i++) {
+          byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 
100);

Review Comment:
   All this could be encapsulated in a RandomCellGenerator kind of class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to