This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9d46ce3 [HUDI -409] Match header and footer block length to improve corrupted block detection (#1332) 9d46ce3 is described below commit 9d46ce380a3929605b3838238e8aa07a9918ab7a Author: Ramachandran M S <mramachand...@uber.com> AuthorDate: Tue Mar 3 13:26:54 2020 -0800 [HUDI -409] Match header and footer block length to improve corrupted block detection (#1332) --- .../hudi/common/table/log/HoodieLogFileReader.java | 16 +++++++++++++++- .../common/table/log/HoodieLogFormatWriter.java | 3 +++ .../hudi/common/table/log/TestHoodieLogFormat.java | 21 ++++++++++++++++++--- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 40a5243..53627e6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -231,6 +231,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { inputStream.seek(currentPos + blocksize); } } catch (EOFException e) { + LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF"); // this is corrupt // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream // release-3.1.0-RC1/DFSInputStream.java#L1455 @@ -239,12 +240,26 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { return true; } + // check if the blocksize mentioned in the footer is the same as the header; by seeking back the length of a long + // the backward seek does not incur additional IO as {@link org.apache.hadoop.hdfs.DFSInputStream#seek()} + // only moves the index. actual IO happens on the next read operation + inputStream.seek(inputStream.getPos() - Long.BYTES); + // Block size in the footer includes the magic header, which the header does not include. + // So we have to shorten the footer block size by the size of magic hash + long blockSizeFromFooter = inputStream.readLong() - MAGIC_BUFFER.length; + if (blocksize != blockSizeFromFooter) { + LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize + + ") did not match the footer block size(" + blockSizeFromFooter + ")"); + inputStream.seek(currentPos); + return true; + } try { readMagic(); // all good - either we found the sync marker or EOF. Reset position and continue return false; } catch (CorruptedLogFileException e) { // This is a corrupted block + LOG.info("Found corrupted block in file " + logFile + ". No magic hash found right after footer block size entry"); return true; } finally { inputStream.seek(currentPos); @@ -310,7 +325,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { } private boolean hasNextMagic() throws IOException { - long pos = inputStream.getPos(); // 1. Read magic header from the start of the block inputStream.readFully(MAGIC_BUFFER, 0, 6); return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 201b879..1b2e188 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -158,6 +158,9 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { this.output.write(footerBytes); // 9. Write the total size of the log block (including magic) which is everything written // until now (for reverse pointer) + // Update: this information is now used in determining if a block is corrupt by comparing to the + // block size in header. This change assumes that the block size will be the last data written + // to a block. Read will break if any data is written past this point for a block. this.output.writeLong(this.output.size() - currentSize); // Flush every block to disk flush(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index b896d1f..17d04da 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -491,16 +491,27 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { // create a block with outputStream.write(HoodieLogFormat.MAGIC); // Write out a length that does not confirm with the content - outputStream.writeLong(1000); + outputStream.writeLong(474); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content - outputStream.writeLong(500); - // Write out some bytes + outputStream.writeLong(400); + // Write out incomplete content outputStream.write("something-random".getBytes()); outputStream.flush(); outputStream.close(); + // Append a proper block that is of the missing length of the corrupted block + writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 10); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + dataBlock = new HoodieAvroDataBlock(records, header); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // First round of reads - we should be able to read the first block and then EOF Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); assertTrue("First block should be available", reader.hasNext()); @@ -508,6 +519,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertTrue("We should have corrupted block next", reader.hasNext()); HoodieLogBlock block = reader.next(); assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType()); + assertTrue("Third block should be available", reader.hasNext()); + reader.next(); assertFalse("There should be no more block left", reader.hasNext()); reader.close(); @@ -543,6 +556,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.next(); assertTrue("We should get the 1st corrupted block next", reader.hasNext()); reader.next(); + assertTrue("Third block should be available", reader.hasNext()); + reader.next(); assertTrue("We should get the 2nd corrupted block next", reader.hasNext()); block = reader.next(); assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());