alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778577147



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
     HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
 
     // 3. Read the block type for a log block
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      type = inputStream.readInt();
-
-      ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, 
"Invalid block byte type found " + type);
-      blockType = HoodieLogBlockType.values()[type];
-    }
+    HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
 
     // 4. Read the header for a log block, if present
-    if (nextBlockVersion.hasHeader()) {
-      header = HoodieLogBlock.getLogMetadata(inputStream);
-    }
 
-    int contentLength = blocksize;
+    Map<HeaderMetadataType, String> header =
+        nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
     // 5. Read the content length for the content
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      contentLength = (int) inputStream.readLong();
-    }
+    // Fallback to full-block size if no content-length
+    // TODO replace w/ hasContentLength
+    int contentLength =
+        nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
 
     // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-    // TODO - have a max block size and reuse this buffer in the ByteBuffer
-    // (hard to guess max block size for now)
     long contentPosition = inputStream.getPos();
-    byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, 
contentLength, readBlockLazily);
+    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+    Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
 
     // 7. Read footer if any
-    Map<HeaderMetadataType, String> footer = null;
-    if (nextBlockVersion.hasFooter()) {
-      footer = HoodieLogBlock.getLogMetadata(inputStream);
-    }
+    Map<HeaderMetadataType, String> footer =
+        nextBlockVersion.hasFooter() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
 
     // 8. Read log block length, if present. This acts as a reverse pointer 
when traversing a
     // log file in reverse
-    @SuppressWarnings("unused")
-    long logBlockLength = 0;
     if (nextBlockVersion.hasLogBlockLength()) {
-      logBlockLength = inputStream.readLong();
+      inputStream.readLong();
     }
 
     // 9. Read the log block end position in the log file
     long blockEndPos = inputStream.getPos();
 
     switch (Objects.requireNonNull(blockType)) {
-      // based on type read the block
       case AVRO_DATA_BLOCK:
         if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-          return HoodieAvroDataBlock.getBlock(content, readerSchema);
+          return HoodieAvroDataBlock.getBlock(content.get(), readerSchema);
         } else {
-          return new HoodieAvroDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-              contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer, keyField);
+          return new HoodieAvroDataBlock(logFile, inputStream, content, 
readBlockLazily,
+              contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
         }
+
       case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-            contentPosition, contentLength, blockEndPos, readerSchema,
-            header, footer, enableInlineReading, keyField);
+        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+            String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+        return new HoodieHFileDataBlock(logFile, inputStream, content, 
readBlockLazily,
+            contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema),
+            header, footer, keyField, enableRecordLookups);
+
+      case PARQUET_DATA_BLOCK:
+        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+            String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+        return new HoodieParquetDataBlock(logFile, inputStream, content, 
readBlockLazily,
+            contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
+
       case DELETE_BLOCK:
-        return HoodieDeleteBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
+        return HoodieDeleteBlock.getBlock(logFile, inputStream, content, 
readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
+
       case COMMAND_BLOCK:
-        return HoodieCommandBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
+        return HoodieCommandBlock.getBlock(logFile, inputStream, content, 
readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
+
       default:
         throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
     }
   }
 
+  @Nullable
+  private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion 
blockVersion) throws IOException {
+    if (blockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
+      return null;

Review comment:
       While addressing this i've stumbled upon peculiar behavior that is 
currently implemented: for `DEFAULT_VERSION` block reading would actually fail 
since it doesn't contain block-type, and we assert at line 252 (before change) 
that it's not null. 
   
   While this doesn't seem right, i don't thin i have a good idea around what 
should be the behavior since we can't reliably determine what block is in-front 
of us and it seems that DEFAULT_VERSION supported Avro, Delete, Corrupt blocks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to