hudi-agent commented on code in PR #13870:
URL: https://github.com/apache/hudi/pull/13870#discussion_r3237149439


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java:
##########
@@ -204,6 +205,10 @@ public BaseStatistics getStatistics(BaseStatistics 
baseStatistics) {
     return null;
   }
 
+  public HoodieReadStats getLogReadStats() {
+    return 
((HoodieFileGroupReader.HoodieFileGroupReaderIterator)this.iterator).getReadStats();

Review Comment:
   🤖 I think this cast will throw `ClassCastException` at runtime. 
`this.iterator` is set in `open()` via `initIterator()` → 
`getSplitRowIterator()` → `fileGroupReader.getClosableIterator()`, which 
returns a `CloseableMappingIterator<>(...)`, not a 
`HoodieFileGroupReaderIterator`. The new test only calls 
`getClosableBufferedRecordIterator()` directly on the reader (which does return 
`HoodieFileGroupReaderIterator`), so the production path through 
`StreamReadOperator.processSplits()` isn't actually covered. Could you add an 
end-to-end test that drives this through the input format and verify the metric 
is populated? @danny0405
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -385,6 +390,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
 
       // Done
       progress = 1.0f;
+      totalCorruptLogFiles.set(logFilePaths.size() - scannedLogFiles.size());

Review Comment:
   🤖 This change only updates `scanInternalV1`. `scanInternalV2` (selected when 
`enableOptimizedLogBlocksScan=true`, including some MDT code paths) also has 
its own `scannedLogFiles` tracking but never sets `totalCorruptLogFiles`, so 
the metric will silently stay at 0 for those readers. Could the same accounting 
be added to `scanInternalV2` as well so the metric works regardless of which 
scan version is in use?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -385,6 +390,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
 
       // Done
       progress = 1.0f;
+      totalCorruptLogFiles.set(logFilePaths.size() - scannedLogFiles.size());
+      if (totalCorruptLogFiles.get() > 0) {
+        List<String> scannedLogFilesNames = 
scannedLogFiles.stream().map(HoodieLogFile::getPath)
+            .map(StoragePath::getName).collect(Collectors.toList());
+        corruptLogFilesList = CollectionUtils.diff(logFilePaths, 
scannedLogFilesNames);

Review Comment:
   🤖 I think the comparison here is mismatched. `logFilePaths` holds full path 
strings (set in `HoodieMergedLogRecordReader.Builder.withLogFiles` via 
`l.getPath().toString()`), but `scannedLogFilesNames` extracts just the file 
name via `StoragePath::getName`. `CollectionUtils.diff` removes by `equals`, so 
no full path will ever match a bare name and `corruptLogFilesList` ends up 
equal to all of `logFilePaths`. The numeric metric is correct, but the log line 
below will list every log file as "corrupt". Could you use 
`StoragePath::toString` (or compare names on both sides) instead?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -122,6 +123,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private AtomicLong totalRollbacks = new AtomicLong(0);
   // Total number of corrupt blocks written across all log files
   private AtomicLong totalCorruptBlocks = new AtomicLong(0);
+  // Total number of corrupt log files - for metrics
+  private AtomicLong totalCorruptLogFiles = new AtomicLong(0);
+  // Corrupt log files list - for debug

Review Comment:
   🤖 nit: `corruptLogFilesList` encodes the container type in the name — the 
`List<String>` declaration already conveys that. Could you rename it to 
`corruptLogFiles` to avoid the redundancy?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("logFileOnlyCases")
+  public void 
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
 recordMergeMode, String logDataBlockFormat) throws Exception {
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    // Use InMemoryIndex to generate log only mor table
+    writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+    // use only one partition

Review Comment:
   🤖 nit: `DEFAULT_PARTITION_PATH` uses SCREAMING_SNAKE_CASE, which by Java 
convention signals a `static final` constant, not a local variable. Could you 
rename it to `defaultPartitionPath` (or `partitionPaths`) so readers don't do a 
double-take?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("logFileOnlyCases")
+  public void 
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
 recordMergeMode, String logDataBlockFormat) throws Exception {
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    // Use InMemoryIndex to generate log only mor table
+    writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+    // use only one partition
+    final String[] DEFAULT_PARTITION_PATH =
+        {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH};
+
+    try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF, 
DEFAULT_PARTITION_PATH, new HashMap<>())) {
+      // One commit; reading one file group containing a log file only
+      List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+      commitToTable(initialRecords, UPSERT.value(), true, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+          initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+    }
+
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getStorageConf());
+    HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
+    FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(
+        engineContext,
+        metadataConfig,
+        FileSystemViewStorageConfig.newBuilder().build(),
+        HoodieCommonConfig.newBuilder().build(),
+        mc -> metaClient.getTableFormat().getMetadataFactory().create(
+            engineContext, mc.getStorage(), metadataConfig, getBasePath()));
+    HoodieTableFileSystemView fsView =
+        (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+    List<String> relativePartitionPathList = 
FSUtils.getAllPartitionPaths(engineContext, metaClient, metadataConfig);
+    List<FileSlice> fileSlices =
+        relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+            .collect(Collectors.toList());
+
+    // current partition just only one log

Review Comment:
   🤖 nit: `beforeFileSlices` is collected with exactly the same stream 
expression as `fileSlices` a few lines above, so this block re-fetches 
identical data. Could you replace `beforeFileSlices` with `fileSlices` in the 
assertion and drop the re-collection entirely?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("logFileOnlyCases")
+  public void 
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
 recordMergeMode, String logDataBlockFormat) throws Exception {
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    // Use InMemoryIndex to generate log only mor table
+    writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+    // use only one partition
+    final String[] DEFAULT_PARTITION_PATH =
+        {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH};
+
+    try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF, 
DEFAULT_PARTITION_PATH, new HashMap<>())) {
+      // One commit; reading one file group containing a log file only
+      List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+      commitToTable(initialRecords, UPSERT.value(), true, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+          initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+    }
+
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(getStorageConf());
+    HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
+    FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(
+        engineContext,
+        metadataConfig,
+        FileSystemViewStorageConfig.newBuilder().build(),
+        HoodieCommonConfig.newBuilder().build(),
+        mc -> metaClient.getTableFormat().getMetadataFactory().create(
+            engineContext, mc.getStorage(), metadataConfig, getBasePath()));
+    HoodieTableFileSystemView fsView =
+        (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+    List<String> relativePartitionPathList = 
FSUtils.getAllPartitionPaths(engineContext, metaClient, metadataConfig);
+    List<FileSlice> fileSlices =
+        relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+            .collect(Collectors.toList());
+
+    // current partition just only one log
+    List<FileSlice> beforeFileSlices =
+        relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+            .collect(Collectors.toList());
+    assertEquals(beforeFileSlices.size(), 1);
+
+    // overwrite the current log in this partition to mock the log's magic num 
is broken or log is corrupt
+    HoodieLogFile hoodieLogFile = fileSlices.get(0).getLatestLogFile().get();
+    OutputStream outputStream = 
metaClient.getStorage().create(hoodieLogFile.getPath());
+    outputStream.write(0);
+    outputStream.close();
+
+    List<FileSlice> newFileSlices =
+        relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+            .collect(Collectors.toList());
+    assertEquals(newFileSlices.size(), 1);
+
+    TypedProperties props = buildProperties(metaClient, recordMergeMode);
+    Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    HoodieFileGroupReader<RowData> hoodieFileGroupReader =
+        getHoodieFileGroupReader(getStorageConf(), getBasePath(), metaClient, 
avroSchema, newFileSlices.get(0), 0, props, false);
+
+    HoodieFileGroupReader.HoodieFileGroupReaderIterator closableIterator
+        = (HoodieFileGroupReader.HoodieFileGroupReaderIterator) 
hoodieFileGroupReader.getClosableBufferedRecordIterator();
+
+    assertEquals(closableIterator.getReadStats().getTotalCorruptLogFiles(),1);

Review Comment:
   🤖 nit: JUnit's `assertEquals` convention is `(expected, actual)`, but the 
arguments here are swapped. Could you flip them to `assertEquals(1, 
closableIterator.getReadStats().getTotalCorruptLogFiles())`? That way failure 
messages say "expected 1 but was X" rather than the reverse.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to