prashantwason commented on code in PR #8526:
URL: https://github.com/apache/hudi/pull/8526#discussion_r1213452295


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -152,98 +153,107 @@ private void addShutDownHook() {
   // TODO : convert content and block length to long by using ByteBuffer, raw 
byte [] allows
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
-    int blockSize;
-    long blockStartPos = inputStream.getPos();
-    try {
-      // 1 Read the total size of the block
-      blockSize = (int) inputStream.readLong();
-    } catch (EOFException | CorruptedLogFileException e) {
-      // An exception reading any of the above indicates a corrupt block
-      // Create a corrupt block by finding the next MAGIC marker or EOF
-      return createCorruptBlock(blockStartPos);
-    }
-
-    // We may have had a crash which could have written this block partially
-    // Skip blockSize in the stream and we should either find a sync marker 
(start of the next
-    // block) or EOF. If we did not find either of it, then this block is a 
corrupted block.
-    boolean isCorrupted = isBlockCorrupted(blockSize);
-    if (isCorrupted) {
-      return createCorruptBlock(blockStartPos);
-    }
-
-    // 2. Read the version for this log format
-    HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
+    long blockStartPos = 0;
+    long blockSize = 0;
 
-    // 3. Read the block type for a log block
-    HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
+    try {
+      blockStartPos = inputStream.getPos();
 
-    // 4. Read the header for a log block, if present
+      // 1 Read the total size of the block
+      blockSize = inputStream.readLong();
+
+      // We may have had a crash which could have written this block 
partially. We are deferring the check for corrupted block so as not to pay the
+      // penalty of doing seeks + read and then re-seeks. More aggressive 
checks after reading each item as well as a final corrupted check should ensure 
we
+      // find the corrupted block eventually.
+
+      // 2. Read the version for this log format
+      HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
+
+      // 3. Read the block type for a log block
+      HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
+
+      // 4. Read the header for a log block, if present
+      Map<HeaderMetadataType, String> header =
+          nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
+      // 5. Read the content length for the content
+      // Fallback to full-block size if no content-length
+      // TODO replace w/ hasContentLength
+      long contentLength =
+          nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
+      checkArgument(contentLength >= 0, "Content Length should be greater than 
or equal to 0 " + contentLength);
+
+      // 6. Read the content or skip content based on IO vs Memory trade-off 
by client
+      long contentPosition = inputStream.getPos();
+      boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+      Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
+
+      // 7. Read footer if any
+      Map<HeaderMetadataType, String> footer =
+          nextBlockVersion.hasFooter() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
+      // 8. Read log block length, if present. This acts as a reverse pointer 
when traversing a
+      // log file in reverse
+      if (nextBlockVersion.hasLogBlockLength()) {
+        long currentPos = inputStream.getPos();
+        long logBlockLength = inputStream.readLong();
+        if (blockSize != (logBlockLength - magicBuffer.length) || currentPos 
!= (blockStartPos + blockSize)) {
+          return createCorruptBlock(blockStartPos);
+        }
+      }
 
-    Map<HeaderMetadataType, String> header =
-        nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+      // 9. Read the log block end position in the log file
+      long blockEndPos = inputStream.getPos();
 
-    // 5. Read the content length for the content
-    // Fallback to full-block size if no content-length
-    // TODO replace w/ hasContentLength
-    int contentLength =
-        nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
+      HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
+          new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, 
logFile, contentPosition, contentLength, blockEndPos);
 
-    // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-    long contentPosition = inputStream.getPos();
-    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
-    Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
+      switch (Objects.requireNonNull(blockType)) {
+        case AVRO_DATA_BLOCK:
+          if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
+            return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, 
internalSchema);
+          } else {
+            return new HoodieAvroDataBlock(inputStream, content, 
readBlockLazily, logBlockContentLoc,
+                getTargetReaderSchemaForBlock(), header, footer, keyField);
+          }
 
-    // 7. Read footer if any
-    Map<HeaderMetadataType, String> footer =
-        nextBlockVersion.hasFooter() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+        case HFILE_DATA_BLOCK:
+          checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+              String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
 
-    // 8. Read log block length, if present. This acts as a reverse pointer 
when traversing a
-    // log file in reverse
-    if (nextBlockVersion.hasLogBlockLength()) {
-      inputStream.readLong();
-    }
+          return new HoodieHFileDataBlock(inputStream, content, 
readBlockLazily, logBlockContentLoc,
+              Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath());
 
-    // 9. Read the log block end position in the log file
-    long blockEndPos = inputStream.getPos();
+        case PARQUET_DATA_BLOCK:
+          checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+              String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
 
-    HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
-        new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, 
contentPosition, contentLength, blockEndPos);
-
-    switch (Objects.requireNonNull(blockType)) {
-      case AVRO_DATA_BLOCK:
-        if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-          return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, 
internalSchema);
-        } else {
-          return new HoodieAvroDataBlock(inputStream, content, 
readBlockLazily, logBlockContentLoc,
+          return new HoodieParquetDataBlock(inputStream, content, 
readBlockLazily, logBlockContentLoc,
               getTargetReaderSchemaForBlock(), header, footer, keyField);
-        }
-
-      case HFILE_DATA_BLOCK:
-        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
-            String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
-
-        return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, 
logBlockContentLoc,
-            Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath());
 
-      case PARQUET_DATA_BLOCK:
-        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
-            String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+        case DELETE_BLOCK:
+          return new HoodieDeleteBlock(content, inputStream, readBlockLazily, 
Option.of(logBlockContentLoc), header, footer);
 
-        return new HoodieParquetDataBlock(inputStream, content, 
readBlockLazily, logBlockContentLoc,
-            getTargetReaderSchemaForBlock(), header, footer, keyField);
+        case COMMAND_BLOCK:
+          return new HoodieCommandBlock(content, inputStream, readBlockLazily, 
Option.of(logBlockContentLoc), header, footer);
 
-      case DELETE_BLOCK:
-        return new HoodieDeleteBlock(content, inputStream, readBlockLazily, 
Option.of(logBlockContentLoc), header, footer);
+        case CDC_DATA_BLOCK:
+          return new HoodieCDCDataBlock(inputStream, content, readBlockLazily, 
logBlockContentLoc, readerSchema, header, keyField);
 
-      case COMMAND_BLOCK:
-        return new HoodieCommandBlock(content, inputStream, readBlockLazily, 
Option.of(logBlockContentLoc), header, footer);
-
-      case CDC_DATA_BLOCK:
-        return new HoodieCDCDataBlock(inputStream, content, readBlockLazily, 
logBlockContentLoc, readerSchema, header, keyField);
-
-      default:
-        throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
+        default:
+          throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
+      }
+    } catch (IOException | CorruptedLogFileException | 
IllegalArgumentException e) {
+      // check for corrupt block
+      inputStream.seek(blockStartPos);
+      if (isBlockCorrupted(blockSize)) {

Review Comment:
   Moved the various exceptions to their own catch blocks to make the handling 
clear.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to