This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ecd39e3ad76b92a4f1dd0e18beed146736dc0592 Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Wed Nov 2 12:26:49 2022 +0800 add log to print scanInternal's logFilePath --- .../table/log/AbstractHoodieLogRecordReader.java | 41 ++++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4566b1f5cd..eaca33ddcf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -72,7 +72,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo /** * Implements logic to scan log blocks and expose valid and deleted log records to subclass implementation. Subclass is * free to either apply merging or expose raw data back to the caller. - * + * <p> * NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once * This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of * seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block @@ -208,6 +208,8 @@ public abstract class AbstractHoodieLogRecordReader { HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); + HoodieLogFile logFile; + Path logFilePath = null; try { // Get the key field based on populate meta fields config // and the table type @@ -216,12 +218,13 @@ public abstract class AbstractHoodieLogRecordReader { // Iterate over the paths boolean enableRecordLookups = !forceFullScan; logFormatReaderWrapper = new HoodieLogFormatReader(fs, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), + logFilePaths.stream().map(log -> new HoodieLogFile(new Path(log))).collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { - HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); + logFile = logFormatReaderWrapper.getLogFile(); + logFilePath = logFile.getPath(); LOG.info("Scanning log file " + logFile); scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); @@ -250,7 +253,7 @@ public abstract class AbstractHoodieLogRecordReader { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: case PARQUET_DATA_BLOCK: - LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " + LOG.info("Reading a data block from file " + logFilePath + " at instant " + logBlock.getLogBlockHeader().get(INSTANT_TIME)); if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is an avro data block belonging to a different commit/instant, @@ -261,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader { currentInstantLogBlocks.push(logBlock); break; case DELETE_BLOCK: - LOG.info("Reading a delete block from file " + logFile.getPath()); + LOG.info("Reading a delete block from file " + logFilePath); if (isNewInstantBlock(logBlock) && !readBlocksLazily) { // If this is a delete data block belonging to a different commit/instant, // then merge the last blocks and records into the main result @@ -283,7 +286,7 @@ public abstract class AbstractHoodieLogRecordReader { // written per ingestion batch for a file but in reality we need to rollback (B1 & B2) // The following code ensures the same rollback block (R1) is used to rollback // both B1 & B2 - LOG.info("Reading a command block from file " + logFile.getPath()); + LOG.info("Reading a command block from file " + logFilePath); // This is a command block - take appropriate action based on the command HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; String targetInstantForCommandBlock = @@ -302,23 +305,23 @@ public abstract class AbstractHoodieLogRecordReader { HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); // handle corrupt blocks separately since they may not have metadata if (lastBlock.getBlockType() == CORRUPT_BLOCK) { - LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + LOG.info("Rolling back the last corrupted log block read in " + logFilePath); currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) { // rollback last data block or delete block - LOG.info("Rolling back the last log block read in " + logFile.getPath()); + LOG.info("Rolling back the last log block read in " + logFilePath); currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (!targetInstantForCommandBlock .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) { // invalid or extra rollback block LOG.warn("TargetInstantTime " + targetInstantForCommandBlock - + " invalid or extra rollback command block in " + logFile.getPath()); + + " invalid or extra rollback command block in " + logFilePath); break; } else { // this should not happen ideally - LOG.warn("Unable to apply rollback command block in " + logFile.getPath()); + LOG.warn("Unable to apply rollback command block in " + logFilePath); } } LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); @@ -328,7 +331,7 @@ public abstract class AbstractHoodieLogRecordReader { } break; case CORRUPT_BLOCK: - LOG.info("Found a corrupt block in " + logFile.getPath()); + LOG.info("Found a corrupt block in " + logFilePath); totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(logBlock); @@ -345,11 +348,11 @@ public abstract class AbstractHoodieLogRecordReader { // Done progress = 1.0f; } catch (IOException e) { - LOG.error("Got IOException when reading log file", e); - throw new HoodieIOException("IOException when reading log file ", e); + LOG.error("Got IOException when reading log file: " + logFilePath, e); + throw new HoodieIOException("IOException when reading log file: " + logFilePath, e); } catch (Exception e) { - LOG.error("Got exception when reading log file", e); - throw new HoodieException("Exception when reading log file ", e); + LOG.error("Got exception when reading log file: " + logFilePath, e); + throw new HoodieException("Exception when reading log file: " + logFilePath, e); } finally { try { if (null != logFormatReaderWrapper) { @@ -423,10 +426,10 @@ public abstract class AbstractHoodieLogRecordReader { * @return HoodieRecord created from the IndexedRecord */ protected HoodieAvroRecord<?> createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig, - final String payloadClassFQN, final String preCombineField, - final boolean withOperationField, - final Option<Pair<String, String>> simpleKeyGenFields, - final Option<String> partitionName) { + final String payloadClassFQN, final String preCombineField, + final boolean withOperationField, + final Option<Pair<String, String>> simpleKeyGenFields, + final Option<String> partitionName) { if (this.populateMetaFields) { return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN, preCombineField, withOperationField);