This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bba95305a073b5ffe94fb579b8a525fd92d54294 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Sun Sep 10 14:11:49 2023 -0400 [HUDI-6758] Fixing deducing spurious log blocks due to spark retries (#9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with #9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks. --- .../org/apache/hudi/io/HoodieAppendHandle.java | 22 ++++- .../org/apache/hudi/DummyTaskContextSupplier.java | 5 ++ .../hudi/client/FlinkTaskContextSupplier.java | 5 ++ .../java/org/apache/hudi/io/FlinkAppendHandle.java | 4 + .../client/common/JavaTaskContextSupplier.java | 6 ++ .../testutils/HoodieJavaClientTestHarness.java | 5 ++ .../hudi/client/SparkTaskContextSupplier.java | 6 ++ .../common/engine/LocalTaskContextSupplier.java | 6 ++ .../hudi/common/engine/TaskContextSupplier.java | 5 ++ .../table/log/AbstractHoodieLogRecordReader.java | 95 ++++++++++++++-------- .../common/table/log/block/HoodieLogBlock.java | 2 +- .../common/functional/TestHoodieLogFormat.java | 2 +- 12 files changed, 123 insertions(+), 40 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 65f79c5147e..ca081fce60f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -54,6 +54,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieAppendException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -132,6 +133,8 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O // Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries. // It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1. private int blockSequenceNumber = 0; + // On task failures, a given task could be retried. So, this attempt number will track the number of attempts. + private int attemptNumber = 0; /** * This is used by log compaction only. @@ -143,6 +146,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O this.useWriterSchema = true; this.isLogCompaction = true; this.header.putAll(header); + this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get(); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, @@ -153,6 +157,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O this.sizeEstimator = new DefaultSizeEstimator(); this.statuses = new ArrayList<>(); this.recordProperties.putAll(config.getProps()); + this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get(); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, @@ -461,11 +466,13 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O ? HoodieRecord.RECORD_KEY_METADATA_FIELD : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get()), keyField)); + blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config, + addBlockIdentifier()), keyField)); } if (appendDeleteBlocks && recordsToDelete.size() > 0) { - blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get()))); + blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config, + addBlockIdentifier()))); } if (blocks.size() > 0) { @@ -562,6 +569,10 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O return true; } + protected boolean addBlockIdentifier() { + return true; + } + private void writeToBuffer(HoodieRecord<T> record) { if (!partitionPath.equals(record.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -635,10 +646,13 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O } } - private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber) { + private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber, + HoodieWriteConfig config, boolean addBlockIdentifier) { Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); updatedHeader.putAll(header); - updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber)); + if (addBlockIdentifier && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block sequence numbers only for data table. + updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber)); + } return updatedHeader; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java index d2c07e35509..d87b6147302 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java @@ -45,4 +45,9 @@ public class DummyTaskContextSupplier extends TaskContextSupplier { public Option<String> getProperty(EngineProperty prop) { return null; } + + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return null; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java index aab248fc3cf..03c835c5553 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java @@ -62,4 +62,9 @@ public class FlinkTaskContextSupplier extends TaskContextSupplier { return Option.empty(); } + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return () -> -1; + } + } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 4b56d6a442c..3dc76ed435e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -92,6 +92,10 @@ public class FlinkAppendHandle<T, I, K, O> && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); } + protected boolean addBlockIdentifier() { + return false; + } + @Override public List<WriteStatus> close() { try { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java index 628201ccc25..b40419a8015 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java @@ -44,4 +44,10 @@ public class JavaTaskContextSupplier extends TaskContextSupplier { public Option<String> getProperty(EngineProperty prop) { return Option.empty(); } + + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return () -> 0; + } + } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 68b7ed18a7f..ebcdfd5daa1 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -184,6 +184,11 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest public Option<String> getProperty(EngineProperty prop) { return Option.empty(); } + + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return () -> (int)attemptId; + } } protected void initFileSystem(String basePath, Configuration hadoopConf) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java index d118f0ead8d..7cfa411511a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java @@ -50,6 +50,11 @@ public class SparkTaskContextSupplier extends TaskContextSupplier implements Ser return () -> TaskContext.get().taskAttemptId(); } + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return () -> TaskContext.get().attemptNumber(); + } + @Override public Option<String> getProperty(EngineProperty prop) { if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) { @@ -89,4 +94,5 @@ public class SparkTaskContextSupplier extends TaskContextSupplier implements Ser } throw new HoodieException("Unknown engine property :" + prop); } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java index b0decb8696f..bff42692340 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java @@ -45,4 +45,10 @@ public final class LocalTaskContextSupplier extends TaskContextSupplier { public Option<String> getProperty(EngineProperty prop) { return Option.empty(); } + + @Override + public Supplier<Integer> getAttemptNumberSupplier() { + return () -> 0; + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java index 813236c07a8..24a6d0e527a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java @@ -35,4 +35,9 @@ public abstract class TaskContextSupplier implements Serializable { public abstract Supplier<Long> getAttemptIdSupplier(); public abstract Option<String> getProperty(EngineProperty prop); + + /** + * @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries. + */ + public abstract Supplier<Integer> getAttemptNumberSupplier(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 94bd68e62c4..3678efe7862 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -61,12 +61,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK; -import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_SEQUENCE_NUMBER; +import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; @@ -225,6 +226,7 @@ public abstract class AbstractHoodieLogRecordReader { currentInstantLogBlocks = new ArrayDeque<>(); List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>(); Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>(); + AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false); progress = 0.0f; totalLogFiles = new AtomicLong(0); @@ -251,13 +253,13 @@ public abstract class AbstractHoodieLogRecordReader { // Use the HoodieLogFileReader to iterate through the blocks in the log file HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); - final String blockSequenceNumberStr = logBlock.getLogBlockHeader().getOrDefault(BLOCK_SEQUENCE_NUMBER, ""); - int blockSeqNo = -1; - long attemptNo = -1L; - if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) { - String[] parts = blockSequenceNumberStr.split(","); - attemptNo = Long.parseLong(parts[0]); - blockSeqNo = Integer.parseInt(parts[1]); + final String blockIdentifier = logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, StringUtils.EMPTY_STRING); + int blockSeqNumber = -1; + long attemptNumber = -1L; + if (!StringUtils.isNullOrEmpty(blockIdentifier)) { + String[] parts = blockIdentifier.split(","); + attemptNumber = Long.parseLong(parts[0]); + blockSeqNumber = Integer.parseInt(parts[1]); } totalLogBlocks.incrementAndGet(); if (logBlock.getBlockType() != CORRUPT_BLOCK @@ -285,14 +287,14 @@ public abstract class AbstractHoodieLogRecordReader { // store the current block currentInstantLogBlocks.push(logBlock); validLogBlockInstants.add(logBlock); - updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit); + updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent); break; case DELETE_BLOCK: LOG.info("Reading a delete block from file " + logFile.getPath()); // store deletes so can be rolled back currentInstantLogBlocks.push(logBlock); validLogBlockInstants.add(logBlock); - updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit); + updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent); break; case COMMAND_BLOCK: // Consider the following scenario @@ -383,14 +385,19 @@ public abstract class AbstractHoodieLogRecordReader { } // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { - Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit); - if (dedupedLogBlocksInfo.getKey()) { - // if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks - currentInstantLogBlocks = new ArrayDeque<>(); - dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block)); - LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } else { + boolean duplicateBlocksDetected = false; + if (blockIdentifiersPresent.get()) { + Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit); + duplicateBlocksDetected = dedupedLogBlocksInfo.getKey(); + if (duplicateBlocksDetected) { + // if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks + currentInstantLogBlocks = new ArrayDeque<>(); + dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block)); + LOG.info("Merging the final data blocks"); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); + } + } + if (!duplicateBlocksDetected) { // if there are no dups, we can take currentInstantLogBlocks as is. LOG.info("Merging the final data blocks"); processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); @@ -429,6 +436,10 @@ public abstract class AbstractHoodieLogRecordReader { boolean dupsFound = blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> perCommitBlockList.size() > 1); if (dupsFound) { + if (LOG.isDebugEnabled()) { + logBlockSequenceMapping(blockSequenceMapPerCommit); + } + // duplicates are found. we need to remove duplicate log blocks. for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry: blockSequenceMapPerCommit.entrySet()) { Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences = entry.getValue(); @@ -436,23 +447,22 @@ public abstract class AbstractHoodieLogRecordReader { // only those that have more than 1 sequence needs deduping. int maxSequenceCount = -1; int maxAttemptNo = -1; - int totalSequences = perCommitBlockSequences.size(); - int counter = 0; for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { Long attemptNo = perAttemptEntries.getKey(); int size = perAttemptEntries.getValue().size(); - if (maxSequenceCount < size) { + if (maxSequenceCount <= size) { maxSequenceCount = size; maxAttemptNo = Math.toIntExact(attemptNo); } - counter++; } - // for other sequence (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks + // for other sequences (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { Long attemptNo = perAttemptEntries.getKey(); if (maxAttemptNo != attemptNo) { List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(pair -> pair.getValue()).collect(Collectors.toList()); - logBlocksToRemove.forEach(logBlockToRemove -> allValidLogBlocks.remove(logBlocksToRemove)); + logBlocksToRemove.forEach(logBlockToRemove -> { + allValidLogBlocks.remove(logBlockToRemove); + }); } } } @@ -463,6 +473,21 @@ public abstract class AbstractHoodieLogRecordReader { } } + private void logBlockSequenceMapping(Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { + LOG.warn("Duplicate log blocks found "); + for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry : blockSequenceMapPerCommit.entrySet()) { + if (entry.getValue().size() > 1) { + LOG.warn("\tCommit time " + entry.getKey()); + Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = entry.getValue(); + for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq : value.entrySet()) { + LOG.warn("\t\tAttempt number " + attemptsSeq.getKey()); + attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog block sequence no : " + entryValue.getKey() + ", log file " + + entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString())); + } + } + } + } + /** * Updates map tracking block seq no. * Here is the map structure. @@ -483,21 +508,23 @@ public abstract class AbstractHoodieLogRecordReader { * * @param logBlock log block of interest to be added. * @param instantTime commit time of interest. - * @param blockSeqNo block sequence number. + * @param blockSeqNumber block sequence number. * @param blockSequenceMapPerCommit map tracking per commit block sequences. */ - private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNo, long attemptNo, - Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { - if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence tracker for log blocks containing the same. + private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNumber, long attemptNumber, + Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit, + AtomicBoolean blockIdentifiersPresent) { + if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block sequence tracker for log blocks containing the same. + blockIdentifiersPresent.set(true); blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); - if (curCommitBlockMap.containsKey(attemptNo)) { + if (curCommitBlockMap.containsKey(attemptNumber)) { // append to existing map entry - curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock)); + curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock)); } else { // create a new map entry - curCommitBlockMap.put(attemptNo, new ArrayList<>()); - curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock)); + curCommitBlockMap.put(attemptNumber, new ArrayList<>()); + curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock)); } // update the latest to block sequence tracker blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); @@ -505,8 +532,8 @@ public abstract class AbstractHoodieLogRecordReader { // all of older blocks are considered valid. there should be only one list for older commits where block sequence number is not present. blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); - curCommitBlockMap.put(0L, new ArrayList<>()); - curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock)); + curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>()); + curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock)); // update the latest to block sequence tracker blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index efec05c857c..0bff4e9d206 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -168,7 +168,7 @@ public abstract class HoodieLogBlock { * new enums at the end. */ public enum HeaderMetadataType { - INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_SEQUENCE_NUMBER + INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index f0ca8ef9944..d9ca8b49553 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -2920,7 +2920,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber) { Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); updatedHeader.putAll(header); - updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(blockSequenceNumber)); + updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(blockSequenceNumber)); return updatedHeader; }