This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d46ce3  [HUDI -409] Match header and footer block length to improve 
corrupted block detection (#1332)
9d46ce3 is described below

commit 9d46ce380a3929605b3838238e8aa07a9918ab7a
Author: Ramachandran M S <mramachand...@uber.com>
AuthorDate: Tue Mar 3 13:26:54 2020 -0800

    [HUDI -409] Match header and footer block length to improve corrupted block 
detection (#1332)
---
 .../hudi/common/table/log/HoodieLogFileReader.java  | 16 +++++++++++++++-
 .../common/table/log/HoodieLogFormatWriter.java     |  3 +++
 .../hudi/common/table/log/TestHoodieLogFormat.java  | 21 ++++++++++++++++++---
 3 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 40a5243..53627e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -231,6 +231,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader 
{
         inputStream.seek(currentPos + blocksize);
       }
     } catch (EOFException e) {
+      LOG.info("Found corrupted block in file " + logFile + " with block 
size(" + blocksize + ") running past EOF");
       // this is corrupt
       // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
       // release-3.1.0-RC1/DFSInputStream.java#L1455
@@ -239,12 +240,26 @@ class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       return true;
     }
 
+    // check if the blocksize mentioned in the footer is the same as the 
header; by seeking back the length of a long
+    // the backward seek does not incur additional IO as {@link 
org.apache.hadoop.hdfs.DFSInputStream#seek()}
+    // only moves the index. actual IO happens on the next read operation
+    inputStream.seek(inputStream.getPos() - Long.BYTES);
+    // Block size in the footer includes the magic header, which the header 
does not include.
+    // So we have to shorten the footer block size by the size of magic hash
+    long blockSizeFromFooter = inputStream.readLong() - MAGIC_BUFFER.length;
+    if (blocksize != blockSizeFromFooter) {
+      LOG.info("Found corrupted block in file " + logFile + ". Header block 
size(" + blocksize
+              + ") did not match the footer block size(" + blockSizeFromFooter 
+ ")");
+      inputStream.seek(currentPos);
+      return true;
+    }
     try {
       readMagic();
       // all good - either we found the sync marker or EOF. Reset position and 
continue
       return false;
     } catch (CorruptedLogFileException e) {
       // This is a corrupted block
+      LOG.info("Found corrupted block in file " + logFile + ". No magic hash 
found right after footer block size entry");
       return true;
     } finally {
       inputStream.seek(currentPos);
@@ -310,7 +325,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader 
{
   }
 
   private boolean hasNextMagic() throws IOException {
-    long pos = inputStream.getPos();
     // 1. Read magic header from the start of the block
     inputStream.readFully(MAGIC_BUFFER, 0, 6);
     return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 201b879..1b2e188 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -158,6 +158,9 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     this.output.write(footerBytes);
     // 9. Write the total size of the log block (including magic) which is 
everything written
     // until now (for reverse pointer)
+    // Update: this information is now used in determining if a block is 
corrupt by comparing to the
+    //   block size in header. This change assumes that the block size will be 
the last data written
+    //   to a block. Read will break if any data is written past this point 
for a block.
     this.output.writeLong(this.output.size() - currentSize);
     // Flush every block to disk
     flush();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
index b896d1f..17d04da 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
@@ -491,16 +491,27 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
-    outputStream.writeLong(1000);
+    outputStream.writeLong(474);
     outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
     outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
     // Write out a length that does not confirm with the content
-    outputStream.writeLong(500);
-    // Write out some bytes
+    outputStream.writeLong(400);
+    // Write out incomplete content
     outputStream.write("something-random".getBytes());
     outputStream.flush();
     outputStream.close();
 
+    // Append a proper block that is of the missing length of the corrupted 
block
+    writer =
+            
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+                    
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+    records = SchemaTestUtil.generateTestRecords(0, 10);
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
+    dataBlock = new HoodieAvroDataBlock(records, header);
+    writer = writer.appendBlock(dataBlock);
+    writer.close();
+
+
     // First round of reads - we should be able to read the first block and 
then EOF
     Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
     assertTrue("First block should be available", reader.hasNext());
@@ -508,6 +519,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     assertTrue("We should have corrupted block next", reader.hasNext());
     HoodieLogBlock block = reader.next();
     assertEquals("The read block should be a corrupt block", 
HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
+    assertTrue("Third block should be available", reader.hasNext());
+    reader.next();
     assertFalse("There should be no more block left", reader.hasNext());
 
     reader.close();
@@ -543,6 +556,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     reader.next();
     assertTrue("We should get the 1st corrupted block next", reader.hasNext());
     reader.next();
+    assertTrue("Third block should be available", reader.hasNext());
+    reader.next();
     assertTrue("We should get the 2nd corrupted block next", reader.hasNext());
     block = reader.next();
     assertEquals("The read block should be a corrupt block", 
HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());

Reply via email to