This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9425e5ac8f6 [HUDI-6758] Detecting and skipping Spurious log blocks with MOR reads (#9545) 9425e5ac8f6 is described below commit 9425e5ac8f67397e408d393bd7f80def58454d3d Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Tue Aug 29 21:33:27 2023 -0400 [HUDI-6758] Detecting and skipping Spurious log blocks with MOR reads (#9545) - Detect and skip duplicate log blocks due to task retries. - Detection based on block sequence number that keeps increasing monotonically during rollover. --- .../org/apache/hudi/io/HoodieAppendHandle.java | 14 +- .../table/log/AbstractHoodieLogRecordReader.java | 169 ++++++++++++++++++--- .../common/table/log/block/HoodieLogBlock.java | 2 +- .../common/functional/TestHoodieLogFormat.java | 143 +++++++++++++++-- 4 files changed, 295 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index d0819aa8007..65f79c5147e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -129,6 +129,9 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O private boolean useWriterSchema = false; private Properties recordProperties = new Properties(); + // Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries. + // It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1. + private int blockSequenceNumber = 0; /** * This is used by log compaction only. @@ -458,11 +461,11 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O ? HoodieRecord.RECORD_KEY_METADATA_FIELD : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, header, keyField)); + blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get()), keyField)); } if (appendDeleteBlocks && recordsToDelete.size() > 0) { - blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), header)); + blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get()))); } if (blocks.size() > 0) { @@ -632,6 +635,13 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O } } + private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber) { + Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); + updatedHeader.putAll(header); + updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber)); + return updatedHeader; + } + private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig, HoodieLogBlock.HoodieLogBlockType logDataBlockFormat, List<HoodieRecord> records, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 7b1e737610b..94bd68e62c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; @@ -65,6 +66,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK; +import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_SEQUENCE_NUMBER; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; @@ -108,8 +110,6 @@ public abstract class AbstractHoodieLogRecordReader { private final TypedProperties payloadProps; // Log File Paths protected final List<String> logFilePaths; - // Read Lazily flag - private final boolean readBlocksLazily; // Reverse reader - Not implemented yet (NA -> Why do we need ?) // but present here for plumbing for future implementation private final boolean reverseReader; @@ -174,7 +174,6 @@ public abstract class AbstractHoodieLogRecordReader { this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.reverseReader = reverseReader; - this.readBlocksLazily = readBlocksLazily; this.fs = fs; this.bufferSize = bufferSize; this.instantRange = instantRange; @@ -224,6 +223,9 @@ public abstract class AbstractHoodieLogRecordReader { private void scanInternalV1(Option<KeySpec> keySpecOpt) { currentInstantLogBlocks = new ArrayDeque<>(); + List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>(); + Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>(); + progress = 0.0f; totalLogFiles = new AtomicLong(0); totalRollbacks = new AtomicLong(0); @@ -238,7 +240,7 @@ public abstract class AbstractHoodieLogRecordReader { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { @@ -249,6 +251,14 @@ public abstract class AbstractHoodieLogRecordReader { // Use the HoodieLogFileReader to iterate through the blocks in the log file HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); + final String blockSequenceNumberStr = logBlock.getLogBlockHeader().getOrDefault(BLOCK_SEQUENCE_NUMBER, ""); + int blockSeqNo = -1; + long attemptNo = -1L; + if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) { + String[] parts = blockSequenceNumberStr.split(","); + attemptNo = Long.parseLong(parts[0]); + blockSeqNo = Integer.parseInt(parts[1]); + } totalLogBlocks.incrementAndGet(); if (logBlock.getBlockType() != CORRUPT_BLOCK && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime @@ -271,25 +281,18 @@ public abstract class AbstractHoodieLogRecordReader { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: case PARQUET_DATA_BLOCK: - LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " - + logBlock.getLogBlockHeader().get(INSTANT_TIME)); - if (isNewInstantBlock(logBlock) && !readBlocksLazily) { - // If this is an avro data block belonging to a different commit/instant, - // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } + LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " + instantTime); // store the current block currentInstantLogBlocks.push(logBlock); + validLogBlockInstants.add(logBlock); + updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit); break; case DELETE_BLOCK: LOG.info("Reading a delete block from file " + logFile.getPath()); - if (isNewInstantBlock(logBlock) && !readBlocksLazily) { - // If this is a delete data block belonging to a different commit/instant, - // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } // store deletes so can be rolled back currentInstantLogBlocks.push(logBlock); + validLogBlockInstants.add(logBlock); + updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit); break; case COMMAND_BLOCK: // Consider the following scenario @@ -334,6 +337,25 @@ public abstract class AbstractHoodieLogRecordReader { return false; }); + // remove entire entry from blockSequenceTracker + blockSequenceMapPerCommit.remove(targetInstantForCommandBlock); + + /// remove all matching log blocks from valid list tracked so far + validLogBlockInstants = validLogBlockInstants.stream().filter(block -> { + // handle corrupt blocks separately since they may not have metadata + if (block.getBlockType() == CORRUPT_BLOCK) { + LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + return true; + } + if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) { + // rollback older data block or delete block + LOG.info(String.format("Rolling back an older log block read from %s with instantTime %s", + logFile.getPath(), targetInstantForCommandBlock)); + return false; + } + return true; + }).collect(Collectors.toList()); + final int numBlocksRolledBack = instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size(); totalRollbacks.addAndGet(numBlocksRolledBack); LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); @@ -351,6 +373,9 @@ public abstract class AbstractHoodieLogRecordReader { totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(logBlock); + validLogBlockInstants.add(logBlock); + // we don't need to update the block sequence tracker here, since the block sequence tracker is meant to remove additional/spurious valid logblocks. + // anyway, contents of corrupt blocks are not read. break; default: throw new UnsupportedOperationException("Block type not supported yet"); @@ -358,9 +383,20 @@ public abstract class AbstractHoodieLogRecordReader { } // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { - LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); + Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit); + if (dedupedLogBlocksInfo.getKey()) { + // if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks + currentInstantLogBlocks = new ArrayDeque<>(); + dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block)); + LOG.info("Merging the final data blocks"); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); + } else { + // if there are no dups, we can take currentInstantLogBlocks as is. + LOG.info("Merging the final data blocks"); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); + } } + // Done progress = 1.0f; } catch (IOException e) { @@ -381,6 +417,101 @@ public abstract class AbstractHoodieLogRecordReader { } } + /** + * There could be spurious log blocks due to spark task retries. So, we will use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log blocks and return + * a deduped set of log blocks. + * @param allValidLogBlocks all valid log blocks parsed so far. + * @param blockSequenceMapPerCommit map containing block sequence numbers for every commit. + * @return a Pair of boolean and list of deduped valid block blocks, where boolean of true means, there have been dups detected. + */ + private Pair<Boolean, List<HoodieLogBlock>> reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks, + Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { + + boolean dupsFound = blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> perCommitBlockList.size() > 1); + if (dupsFound) { + // duplicates are found. we need to remove duplicate log blocks. + for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry: blockSequenceMapPerCommit.entrySet()) { + Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences = entry.getValue(); + if (perCommitBlockSequences.size() > 1) { + // only those that have more than 1 sequence needs deduping. + int maxSequenceCount = -1; + int maxAttemptNo = -1; + int totalSequences = perCommitBlockSequences.size(); + int counter = 0; + for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { + Long attemptNo = perAttemptEntries.getKey(); + int size = perAttemptEntries.getValue().size(); + if (maxSequenceCount < size) { + maxSequenceCount = size; + maxAttemptNo = Math.toIntExact(attemptNo); + } + counter++; + } + // for other sequence (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks + for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { + Long attemptNo = perAttemptEntries.getKey(); + if (maxAttemptNo != attemptNo) { + List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(pair -> pair.getValue()).collect(Collectors.toList()); + logBlocksToRemove.forEach(logBlockToRemove -> allValidLogBlocks.remove(logBlocksToRemove)); + } + } + } + } + return Pair.of(true, allValidLogBlocks); + } else { + return Pair.of(false, allValidLogBlocks); + } + } + + /** + * Updates map tracking block seq no. + * Here is the map structure. + * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit + * Key: Commit time. + * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>> + * Value refers to a Map of different attempts for the commit of interest. List contains the block seq number and the resp HoodieLogBlock. + * + * For eg, if there were two attempts for a file slice while writing(due to spark task retries), here is how the map might look like + * key: commit1 + * value : { + * 0L = List = { {0, lb1}, {1, lb2} }, + * 1L = List = { {0, lb3}, {1, lb4}, {2, lb5}} + * } + * Meaning: for commit1, there was two attempts with Append Handle while writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 and lb5 was added. + * We keep populating this entire map and finally detect spurious log blocks and ignore them. + * In most cases, we might just see one set of sequence for a given commit. + * + * @param logBlock log block of interest to be added. + * @param instantTime commit time of interest. + * @param blockSeqNo block sequence number. + * @param blockSequenceMapPerCommit map tracking per commit block sequences. + */ + private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNo, long attemptNo, + Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { + if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence tracker for log blocks containing the same. + blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); + Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); + if (curCommitBlockMap.containsKey(attemptNo)) { + // append to existing map entry + curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock)); + } else { + // create a new map entry + curCommitBlockMap.put(attemptNo, new ArrayList<>()); + curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock)); + } + // update the latest to block sequence tracker + blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); + } else { + // all of older blocks are considered valid. there should be only one list for older commits where block sequence number is not present. + blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); + Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); + curCommitBlockMap.put(0L, new ArrayList<>()); + curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock)); + // update the latest to block sequence tracker + blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); + } + } + private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) { currentInstantLogBlocks = new ArrayDeque<>(); progress = 0.0f; @@ -397,7 +528,7 @@ public abstract class AbstractHoodieLogRecordReader { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); /** * Scanning log blocks and placing the compacted blocks at the right place require two traversals. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 3ac161cbe1c..efec05c857c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -168,7 +168,7 @@ public abstract class HoodieLogBlock { * new enums at the end. */ public enum HeaderMetadataType { - INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS + INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_SEQUENCE_NUMBER } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 9da97a07333..f0ca8ef9944 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -108,6 +108,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.toList; import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion; import static org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs; import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs; @@ -528,7 +529,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogBlock nextBlock = reader.next(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead); - assertEquals(copyOfRecords1.size(),recordsRead1.size(), + assertEquals(copyOfRecords1.size(), recordsRead1.size(), "Read records size should be equal to the written records size"); assertEquals(copyOfRecords1, recordsRead1, "Both records lists should be the same. (ordering guaranteed)"); @@ -687,6 +688,108 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { scanner.close(); } + @Test + public void testBasicAppendsWithBlockSeqNos() throws IOException, URISyntaxException, InterruptedException { + testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { + return writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); + }); + } + + @Test + public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, URISyntaxException, InterruptedException { + testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { + Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); + // re add the same records again + logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos)); + return logFiles; + }); + } + + @Test + public void testAppendsWithSpruiousLogBlocksFirstAttemptPartial() throws IOException, URISyntaxException, InterruptedException { + testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { + Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); + // removing 4th log block to simulate partial failure in 1st attempt + List<HoodieLogFile> logFileList = new ArrayList<>(logFiles); + logFiles.remove(logFileList.get(logFileList.size() - 1)); + // re add the same records again + logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos)); + return logFiles; + }); + } + + @Test + public void testAppendsWithSpruiousLogBlocksSecondAttemptPartial() throws IOException, URISyntaxException, InterruptedException { + testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { + Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); + // re add the same records again + Set<HoodieLogFile> logFilesSet2 = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); + // removing 4th log block to simular partial failure in 2nd attempt + List<HoodieLogFile> logFileList2 = new ArrayList<>(logFilesSet2); + logFilesSet2.remove(logFileList2.get(logFileList2.size() - 1)); + logFiles.addAll(logFilesSet2); + return logFiles; + }); + } + + private void testAppendsWithSpruiousLogBlocks( + boolean enableOptimizedLogBlocksScan, + Function5<Set<HoodieLogFile>, Path, Schema, List<IndexedRecord>, Integer, Boolean> logGenFunc) + throws IOException, URISyntaxException, InterruptedException { + + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + SchemaTestUtil testUtil = new SchemaTestUtil(); + List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 400); + Set<HoodieLogFile> logFiles = logGenFunc.apply(partitionPath, schema, genRecords, 4, true); + + FileCreateUtils.createDeltaCommit(basePath, "100", fs); + + HoodieMergedLogRecordScanner scanner = getLogRecordScanner(logFiles, schema, enableOptimizedLogBlocksScan); + // even though we have duplicates records, due to block sequence reconcile, only one set of blocks should be parsed as valid + assertRecordsAndCloseScanner(scanner, genRecords, schema); + } + + private void assertRecordsAndCloseScanner(HoodieMergedLogRecordScanner scanner, List<IndexedRecord> genRecords, Schema schema) throws IOException { + List<IndexedRecord> scannedRecords = new ArrayList<>(); + for (HoodieRecord record : scanner) { + scannedRecords.add((IndexedRecord) + ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); + } + + assertEquals(sort(genRecords), sort(scannedRecords), + "Scanner records count should be the same as appended records"); + scanner.close(); + } + + private HoodieMergedLogRecordScanner getLogRecordScanner(Set<HoodieLogFile> logFiles, Schema schema, + boolean enableOptimizedLogBlocksScan) { + + // scan all log blocks (across multiple log files) + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths( + logFiles.stream().sorted(HoodieLogFile.getLogFileComparator()) + .map(l -> l.getPath().toString()).collect(toList())) + .withReaderSchema(schema) + .withLatestInstantTime("100") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(true) + .withReverseReader(false) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .withBitCaskDiskMapCompressionEnabled(true) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) + .build(); + } + + @FunctionalInterface + public interface Function5<R, T1, T2, T3, T4, T5> { + + R apply(T1 v1, T2 v2, T3 v3, T4 v4, T5 v5) throws IOException, InterruptedException; + } + @ParameterizedTest @MethodSource("testArguments") public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, @@ -1316,7 +1419,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> { try { - if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) { + if (!((HoodieRecordPayload) s.getData()).getInsertValue(schema).isPresent()) { emptyPayloads.add(true); } } catch (IOException io) { @@ -1422,7 +1525,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.stream().map(deletedKey -> - DeleteRecord.create(deletedKey.getRecordKey(), deletedKey.getPartitionPath())) + DeleteRecord.create(deletedKey.getRecordKey(), deletedKey.getPartitionPath())) .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header); writer.appendBlock(deleteBlock); @@ -1443,7 +1546,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); deleteBlock = new HoodieDeleteBlock( deletedKeys.stream().map(deletedKey -> - DeleteRecord.create(deletedKey.getRecordKey(), deletedKey.getPartitionPath())) + DeleteRecord.create(deletedKey.getRecordKey(), deletedKey.getPartitionPath())) .collect(Collectors.toList()).toArray(new DeleteRecord[0]), deleteBlockHeader); writer.appendBlock(deleteBlock); @@ -1586,7 +1689,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { scanner.forEach(s -> readKeys.add(s.getRecordKey())); scanner.forEach(s -> { try { - if (!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) { + if (!((HoodieRecordPayload) s.getData()).getInsertValue(schema).isPresent()) { emptyPayloadKeys.add(s.getRecordKey()); } } catch (IOException io) { @@ -2268,7 +2371,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .sorted() .collect(Collectors.toList()); List<String> validBlockInstants = scanner.getValidBlockInstants(); - List<String> expectedBlockInstants = Arrays.asList("108","105", "104"); + List<String> expectedBlockInstants = Arrays.asList("108", "105", "104"); assertEquals(expectedBlockInstants, validBlockInstants); Collections.sort(readKeys); assertEquals(expectedRecords, readKeys, "Record keys read should be exactly same."); @@ -2523,7 +2626,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) { + new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) { assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock block = reader.prev(); @@ -2656,7 +2759,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter(), fs.getConf()); // NOTE: Have to use this ugly hack since List generic is not covariant in its type param - HoodieDataBlock dataBlock = getDataBlock(dataBlockType, (List<IndexedRecord>)(List) records, header); + HoodieDataBlock dataBlock = getDataBlock(dataBlockType, (List<IndexedRecord>) (List) records, header); writer.appendBlock(dataBlock); writer.close(); @@ -2772,6 +2875,15 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Schema schema, List<IndexedRecord> records, int numFiles) throws IOException, InterruptedException { + return writeLogFiles(partitionPath, schema, records, numFiles, false); + } + + private static Set<HoodieLogFile> writeLogFiles(Path partitionPath, + Schema schema, + List<IndexedRecord> records, + int numFiles, + boolean enableBlockSequenceNumbers) throws IOException, InterruptedException { + int blockSeqNo = 0; Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); @@ -2793,8 +2905,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { List<IndexedRecord> targetRecords = records.subList(offset, offset + targetRecordsCount); logFiles.add(writer.getLogFile()); + if (enableBlockSequenceNumbers) { + header = getUpdatedHeader(header, blockSeqNo++); + } writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, header)); - filesWritten++; } @@ -2803,6 +2917,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { return logFiles; } + private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber) { + Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); + updatedHeader.putAll(header); + updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(blockSequenceNumber)); + return updatedHeader; + } + /** * Utility to convert the given iterator to a List. */ @@ -2860,8 +2981,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, boolean readBlocksLazily, - ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int expectedTotalRecords, - int expectedTotalKeys, Option<Set<String>> expectedKeys) throws IOException { + ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int expectedTotalRecords, + int expectedTotalKeys, Option<Set<String>> expectedKeys) throws IOException { List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") .map(s -> s.getPath().toString()).collect(Collectors.toList());