hudi-agent commented on code in PR #13870:
URL: https://github.com/apache/hudi/pull/13870#discussion_r3237149439
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java:
##########
@@ -204,6 +205,10 @@ public BaseStatistics getStatistics(BaseStatistics
baseStatistics) {
return null;
}
+ public HoodieReadStats getLogReadStats() {
+ return
((HoodieFileGroupReader.HoodieFileGroupReaderIterator)this.iterator).getReadStats();
Review Comment:
🤖 I think this cast will throw `ClassCastException` at runtime.
`this.iterator` is set in `open()` via `initIterator()` →
`getSplitRowIterator()` → `fileGroupReader.getClosableIterator()`, which
returns a `CloseableMappingIterator<>(...)`, not a
`HoodieFileGroupReaderIterator`. The new test only calls
`getClosableBufferedRecordIterator()` directly on the reader (which does return
`HoodieFileGroupReaderIterator`), so the production path through
`StreamReadOperator.processSplits()` isn't actually covered. Could you add an
end-to-end test that drives this through the input format and verify the metric
is populated? @danny0405
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -385,6 +390,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
// Done
progress = 1.0f;
+ totalCorruptLogFiles.set(logFilePaths.size() - scannedLogFiles.size());
Review Comment:
🤖 This change only updates `scanInternalV1`. `scanInternalV2` (selected when
`enableOptimizedLogBlocksScan=true`, including some MDT code paths) also has
its own `scannedLogFiles` tracking but never sets `totalCorruptLogFiles`, so
the metric will silently stay at 0 for those readers. Could the same accounting
be added to `scanInternalV2` as well so the metric works regardless of which
scan version is in use?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -385,6 +390,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
// Done
progress = 1.0f;
+ totalCorruptLogFiles.set(logFilePaths.size() - scannedLogFiles.size());
+ if (totalCorruptLogFiles.get() > 0) {
+ List<String> scannedLogFilesNames =
scannedLogFiles.stream().map(HoodieLogFile::getPath)
+ .map(StoragePath::getName).collect(Collectors.toList());
+ corruptLogFilesList = CollectionUtils.diff(logFilePaths,
scannedLogFilesNames);
Review Comment:
🤖 I think the comparison here is mismatched. `logFilePaths` holds full path
strings (set in `HoodieMergedLogRecordReader.Builder.withLogFiles` via
`l.getPath().toString()`), but `scannedLogFilesNames` extracts just the file
name via `StoragePath::getName`. `CollectionUtils.diff` removes by `equals`, so
no full path will ever match a bare name and `corruptLogFilesList` ends up
equal to all of `logFilePaths`. The numeric metric is correct, but the log line
below will list every log file as "corrupt". Could you use
`StoragePath::toString` (or compare names on both sides) instead?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -122,6 +123,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
private AtomicLong totalRollbacks = new AtomicLong(0);
// Total number of corrupt blocks written across all log files
private AtomicLong totalCorruptBlocks = new AtomicLong(0);
+ // Total number of corrupt log files - for metrics
+ private AtomicLong totalCorruptLogFiles = new AtomicLong(0);
+ // Corrupt log files list - for debug
Review Comment:
🤖 nit: `corruptLogFilesList` encodes the container type in the name — the
`List<String>` declaration already conveys that. Could you rename it to
`corruptLogFiles` to avoid the redundancy?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
}
}
+ @ParameterizedTest
+ @MethodSource("logFileOnlyCases")
+ public void
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ // Use InMemoryIndex to generate log only mor table
+ writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+ // use only one partition
Review Comment:
🤖 nit: `DEFAULT_PARTITION_PATH` uses SCREAMING_SNAKE_CASE, which by Java
convention signals a `static final` constant, not a local variable. Could you
rename it to `defaultPartitionPath` (or `partitionPaths`) so readers don't do a
double-take?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
}
}
+ @ParameterizedTest
+ @MethodSource("logFileOnlyCases")
+ public void
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ // Use InMemoryIndex to generate log only mor table
+ writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+ // use only one partition
+ final String[] DEFAULT_PARTITION_PATH =
+ {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH};
+
+ try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF,
DEFAULT_PARTITION_PATH, new HashMap<>())) {
+ // One commit; reading one file group containing a log file only
+ List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+ commitToTable(initialRecords, UPSERT.value(), true, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+ initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+ }
+
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(getStorageConf());
+ HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
+ FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(
+ engineContext,
+ metadataConfig,
+ FileSystemViewStorageConfig.newBuilder().build(),
+ HoodieCommonConfig.newBuilder().build(),
+ mc -> metaClient.getTableFormat().getMetadataFactory().create(
+ engineContext, mc.getStorage(), metadataConfig, getBasePath()));
+ HoodieTableFileSystemView fsView =
+ (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+ List<String> relativePartitionPathList =
FSUtils.getAllPartitionPaths(engineContext, metaClient, metadataConfig);
+ List<FileSlice> fileSlices =
+ relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+ .collect(Collectors.toList());
+
+ // current partition just only one log
Review Comment:
🤖 nit: `beforeFileSlices` is collected with exactly the same stream
expression as `fileSlices` a few lines above, so this block re-fetches
identical data. Could you replace `beforeFileSlices` with `fileSlices` in the
assertion and drop the re-collection entirely?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -296,6 +309,72 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
}
}
+ @ParameterizedTest
+ @MethodSource("logFileOnlyCases")
+ public void
testReadLogFilesOnlyInMergeOnReadTableAndCheckTotalCorruptLogFilesMetric(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ // Use InMemoryIndex to generate log only mor table
+ writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+ // use only one partition
+ final String[] DEFAULT_PARTITION_PATH =
+ {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH};
+
+ try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF,
DEFAULT_PARTITION_PATH, new HashMap<>())) {
+ // One commit; reading one file group containing a log file only
+ List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
+ commitToTable(initialRecords, UPSERT.value(), true, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), false, 1, recordMergeMode,
+ initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+ }
+
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(getStorageConf());
+ HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
+ FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(
+ engineContext,
+ metadataConfig,
+ FileSystemViewStorageConfig.newBuilder().build(),
+ HoodieCommonConfig.newBuilder().build(),
+ mc -> metaClient.getTableFormat().getMetadataFactory().create(
+ engineContext, mc.getStorage(), metadataConfig, getBasePath()));
+ HoodieTableFileSystemView fsView =
+ (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+ List<String> relativePartitionPathList =
FSUtils.getAllPartitionPaths(engineContext, metaClient, metadataConfig);
+ List<FileSlice> fileSlices =
+ relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+ .collect(Collectors.toList());
+
+ // current partition just only one log
+ List<FileSlice> beforeFileSlices =
+ relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+ .collect(Collectors.toList());
+ assertEquals(beforeFileSlices.size(), 1);
+
+ // overwrite the current log in this partition to mock the log's magic num
is broken or log is corrupt
+ HoodieLogFile hoodieLogFile = fileSlices.get(0).getLatestLogFile().get();
+ OutputStream outputStream =
metaClient.getStorage().create(hoodieLogFile.getPath());
+ outputStream.write(0);
+ outputStream.close();
+
+ List<FileSlice> newFileSlices =
+ relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+ .collect(Collectors.toList());
+ assertEquals(newFileSlices.size(), 1);
+
+ TypedProperties props = buildProperties(metaClient, recordMergeMode);
+ Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ HoodieFileGroupReader<RowData> hoodieFileGroupReader =
+ getHoodieFileGroupReader(getStorageConf(), getBasePath(), metaClient,
avroSchema, newFileSlices.get(0), 0, props, false);
+
+ HoodieFileGroupReader.HoodieFileGroupReaderIterator closableIterator
+ = (HoodieFileGroupReader.HoodieFileGroupReaderIterator)
hoodieFileGroupReader.getClosableBufferedRecordIterator();
+
+ assertEquals(closableIterator.getReadStats().getTotalCorruptLogFiles(),1);
Review Comment:
🤖 nit: JUnit's `assertEquals` convention is `(expected, actual)`, but the
arguments here are swapped. Could you flip them to `assertEquals(1,
closableIterator.getReadStats().getTotalCorruptLogFiles())`? That way failure
messages say "expected 1 but was X" rather than the reverse.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]