This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bba95305a073b5ffe94fb579b8a525fd92d54294
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Sun Sep 10 14:11:49 2023 -0400

    [HUDI-6758] Fixing deducing spurious log blocks due to spark retries (#9611)
    
    - We attempted a fix to avoid reading spurious log blocks on the reader 
side with #9545.
    When I tested the patch end to end, found some gaps. Specifically, the 
attempt Id we had with taskContextSupplier was not referring to task's attempt 
number. So, fixing it in this patch. Tested end to test by simulating spark 
retries and spurious log blocks. Reader is able to detect them and ignore 
multiple copies of log blocks.
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 22 ++++-
 .../org/apache/hudi/DummyTaskContextSupplier.java  |  5 ++
 .../hudi/client/FlinkTaskContextSupplier.java      |  5 ++
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  4 +
 .../client/common/JavaTaskContextSupplier.java     |  6 ++
 .../testutils/HoodieJavaClientTestHarness.java     |  5 ++
 .../hudi/client/SparkTaskContextSupplier.java      |  6 ++
 .../common/engine/LocalTaskContextSupplier.java    |  6 ++
 .../hudi/common/engine/TaskContextSupplier.java    |  5 ++
 .../table/log/AbstractHoodieLogRecordReader.java   | 95 ++++++++++++++--------
 .../common/table/log/block/HoodieLogBlock.java     |  2 +-
 .../common/functional/TestHoodieLogFormat.java     |  2 +-
 12 files changed, 123 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 65f79c5147e..ca081fce60f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -54,6 +54,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -132,6 +133,8 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   // Block Sequence number will be used to detect duplicate log blocks(by log 
reader) added due to spark task retries.
   // It should always start with 0 for a given file slice. for roll overs and 
delete blocks, we increment by 1.
   private int blockSequenceNumber = 0;
+  // On task failures, a given task could be retried. So, this attempt number 
will track the number of attempts.
+  private int attemptNumber = 0;
 
   /**
    * This is used by log compaction only.
@@ -143,6 +146,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.useWriterSchema = true;
     this.isLogCompaction = true;
     this.header.putAll(header);
+    this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -153,6 +157,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.sizeEstimator = new DefaultSizeEstimator();
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
+    this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -461,11 +466,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             ? HoodieRecord.RECORD_KEY_METADATA_FIELD
             : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
-        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
getUpdatedHeader(header, blockSequenceNumber++, 
taskContextSupplier.getAttemptIdSupplier().get()), keyField));
+        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
+            addBlockIdentifier()), keyField));
       }
 
       if (appendDeleteBlocks && recordsToDelete.size() > 0) {
-        blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new 
DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, 
taskContextSupplier.getAttemptIdSupplier().get())));
+        blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new 
DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, 
attemptNumber, config,
+            addBlockIdentifier())));
       }
 
       if (blocks.size() > 0) {
@@ -562,6 +569,10 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     return true;
   }
 
+  protected boolean addBlockIdentifier() {
+    return true;
+  }
+
   private void writeToBuffer(HoodieRecord<T> record) {
     if (!partitionPath.equals(record.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
@@ -635,10 +646,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     }
   }
 
-  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber, long attemptNumber) {
+  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber, long attemptNumber,
+                                                                  
HoodieWriteConfig config, boolean addBlockIdentifier) {
     Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
     updatedHeader.putAll(header);
-    updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, 
String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
+    if (addBlockIdentifier && 
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block 
sequence numbers only for data table.
+      updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, 
String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
+    }
     return updatedHeader;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
index d2c07e35509..d87b6147302 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
@@ -45,4 +45,9 @@ public class DummyTaskContextSupplier extends 
TaskContextSupplier {
   public Option<String> getProperty(EngineProperty prop) {
     return null;
   }
+
+  @Override
+  public Supplier<Integer> getAttemptNumberSupplier() {
+    return null;
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
index aab248fc3cf..03c835c5553 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
@@ -62,4 +62,9 @@ public class FlinkTaskContextSupplier extends 
TaskContextSupplier {
     return Option.empty();
   }
 
+  @Override
+  public Supplier<Integer> getAttemptNumberSupplier() {
+    return () -> -1;
+  }
+
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 4b56d6a442c..3dc76ed435e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -92,6 +92,10 @@ public class FlinkAppendHandle<T, I, K, O>
         && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
   }
 
+  protected boolean addBlockIdentifier() {
+    return false;
+  }
+
   @Override
   public List<WriteStatus> close() {
     try {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
index 628201ccc25..b40419a8015 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
@@ -44,4 +44,10 @@ public class JavaTaskContextSupplier extends 
TaskContextSupplier {
   public Option<String> getProperty(EngineProperty prop) {
     return Option.empty();
   }
+
+  @Override
+  public Supplier<Integer> getAttemptNumberSupplier() {
+    return () -> 0;
+  }
+
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 68b7ed18a7f..ebcdfd5daa1 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -184,6 +184,11 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     public Option<String> getProperty(EngineProperty prop) {
       return Option.empty();
     }
+
+    @Override
+    public Supplier<Integer> getAttemptNumberSupplier() {
+      return () -> (int)attemptId;
+    }
   }
 
   protected void initFileSystem(String basePath, Configuration hadoopConf) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
index d118f0ead8d..7cfa411511a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
@@ -50,6 +50,11 @@ public class SparkTaskContextSupplier extends 
TaskContextSupplier implements Ser
     return () -> TaskContext.get().taskAttemptId();
   }
 
+  @Override
+  public Supplier<Integer> getAttemptNumberSupplier() {
+    return () -> TaskContext.get().attemptNumber();
+  }
+
   @Override
   public Option<String> getProperty(EngineProperty prop) {
     if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
@@ -89,4 +94,5 @@ public class SparkTaskContextSupplier extends 
TaskContextSupplier implements Ser
     }
     throw new HoodieException("Unknown engine property :" + prop);
   }
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
index b0decb8696f..bff42692340 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
@@ -45,4 +45,10 @@ public final class LocalTaskContextSupplier extends 
TaskContextSupplier {
   public Option<String> getProperty(EngineProperty prop) {
     return Option.empty();
   }
+
+  @Override
+  public Supplier<Integer> getAttemptNumberSupplier() {
+    return () -> 0;
+  }
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
index 813236c07a8..24a6d0e527a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
@@ -35,4 +35,9 @@ public abstract class TaskContextSupplier implements 
Serializable {
   public abstract Supplier<Long> getAttemptIdSupplier();
 
   public abstract Option<String> getProperty(EngineProperty prop);
+
+  /**
+   * @returns the attempt number for the task of interest. Attempt starts with 
0 and goes up by 1 on retries.
+   */
+  public abstract Supplier<Integer> getAttemptNumberSupplier();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 94bd68e62c4..3678efe7862 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -61,12 +61,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
-import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_SEQUENCE_NUMBER;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -225,6 +226,7 @@ public abstract class AbstractHoodieLogRecordReader {
     currentInstantLogBlocks = new ArrayDeque<>();
     List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
     Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit = new HashMap<>();
+    AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false);
 
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -251,13 +253,13 @@ public abstract class AbstractHoodieLogRecordReader {
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        final String blockSequenceNumberStr = 
logBlock.getLogBlockHeader().getOrDefault(BLOCK_SEQUENCE_NUMBER, "");
-        int blockSeqNo = -1;
-        long attemptNo = -1L;
-        if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
-          String[] parts = blockSequenceNumberStr.split(",");
-          attemptNo = Long.parseLong(parts[0]);
-          blockSeqNo = Integer.parseInt(parts[1]);
+        final String blockIdentifier = 
logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, 
StringUtils.EMPTY_STRING);
+        int blockSeqNumber = -1;
+        long attemptNumber = -1L;
+        if (!StringUtils.isNullOrEmpty(blockIdentifier)) {
+          String[] parts = blockIdentifier.split(",");
+          attemptNumber = Long.parseLong(parts[0]);
+          blockSeqNumber = Integer.parseInt(parts[1]);
         }
         totalLogBlocks.incrementAndGet();
         if (logBlock.getBlockType() != CORRUPT_BLOCK
@@ -285,14 +287,14 @@ public abstract class AbstractHoodieLogRecordReader {
             // store the current block
             currentInstantLogBlocks.push(logBlock);
             validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo, blockSequenceMapPerCommit);
+            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file " + logFile.getPath());
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
             validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo, blockSequenceMapPerCommit);
+            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case COMMAND_BLOCK:
             // Consider the following scenario
@@ -383,14 +385,19 @@ public abstract class AbstractHoodieLogRecordReader {
       }
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
-        Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
-        if (dedupedLogBlocksInfo.getKey()) {
-          // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
-          currentInstantLogBlocks = new ArrayDeque<>();
-          dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
-          LOG.info("Merging the final data blocks");
-          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-        } else {
+        boolean duplicateBlocksDetected = false;
+        if (blockIdentifiersPresent.get()) {
+          Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
+          duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
+          if (duplicateBlocksDetected) {
+            // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
+            currentInstantLogBlocks = new ArrayDeque<>();
+            dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
+            LOG.info("Merging the final data blocks");
+            processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
+          }
+        }
+        if (!duplicateBlocksDetected) {
           // if there are no dups, we can take currentInstantLogBlocks as is.
           LOG.info("Merging the final data blocks");
           processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
@@ -429,6 +436,10 @@ public abstract class AbstractHoodieLogRecordReader {
 
     boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
     if (dupsFound) {
+      if (LOG.isDebugEnabled()) {
+        logBlockSequenceMapping(blockSequenceMapPerCommit);
+      }
+
       // duplicates are found. we need to remove duplicate log blocks.
       for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry: blockSequenceMapPerCommit.entrySet()) {
         Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences 
= entry.getValue();
@@ -436,23 +447,22 @@ public abstract class AbstractHoodieLogRecordReader {
           // only those that have more than 1 sequence needs deduping.
           int maxSequenceCount = -1;
           int maxAttemptNo = -1;
-          int totalSequences = perCommitBlockSequences.size();
-          int counter = 0;
           for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
             Long attemptNo = perAttemptEntries.getKey();
             int size = perAttemptEntries.getValue().size();
-            if (maxSequenceCount < size) {
+            if (maxSequenceCount <= size) {
               maxSequenceCount = size;
               maxAttemptNo = Math.toIntExact(attemptNo);
             }
-            counter++;
           }
-          // for other sequence (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
+          // for other sequences (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
           for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
             Long attemptNo = perAttemptEntries.getKey();
             if (maxAttemptNo != attemptNo) {
               List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(pair -> 
pair.getValue()).collect(Collectors.toList());
-              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlocksToRemove));
+              logBlocksToRemove.forEach(logBlockToRemove -> {
+                allValidLogBlocks.remove(logBlockToRemove);
+              });
             }
           }
         }
@@ -463,6 +473,21 @@ public abstract class AbstractHoodieLogRecordReader {
     }
   }
 
+  private void logBlockSequenceMapping(Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
+    LOG.warn("Duplicate log blocks found ");
+    for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry : blockSequenceMapPerCommit.entrySet()) {
+      if (entry.getValue().size() > 1) {
+        LOG.warn("\tCommit time " + entry.getKey());
+        Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = 
entry.getValue();
+        for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq 
: value.entrySet()) {
+          LOG.warn("\t\tAttempt number " + attemptsSeq.getKey());
+          attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog 
block sequence no : " + entryValue.getKey() + ", log file "
+              + 
entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
+        }
+      }
+    }
+  }
+
   /**
    * Updates map tracking block seq no.
    * Here is the map structure.
@@ -483,21 +508,23 @@ public abstract class AbstractHoodieLogRecordReader {
    *
    * @param logBlock log block of interest to be added.
    * @param instantTime commit time of interest.
-   * @param blockSeqNo block sequence number.
+   * @param blockSeqNumber block sequence number.
    * @param blockSequenceMapPerCommit map tracking per commit block sequences.
    */
-  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNo, long attemptNo,
-                                          Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
-    if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence 
tracker for log blocks containing the same.
+  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNumber, long attemptNumber,
+                                          Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit,
+                                          AtomicBoolean 
blockIdentifiersPresent) {
+    if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block 
sequence tracker for log blocks containing the same.
+      blockIdentifiersPresent.set(true);
       blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
       Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      if (curCommitBlockMap.containsKey(attemptNo)) {
+      if (curCommitBlockMap.containsKey(attemptNumber)) {
         // append to existing map entry
-        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
+        curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, 
logBlock));
       } else {
         // create a new map entry
-        curCommitBlockMap.put(attemptNo, new ArrayList<>());
-        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
+        curCommitBlockMap.put(attemptNumber, new ArrayList<>());
+        curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, 
logBlock));
       }
       // update the latest to block sequence tracker
       blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
@@ -505,8 +532,8 @@ public abstract class AbstractHoodieLogRecordReader {
       // all of older blocks are considered valid. there should be only one 
list for older commits where block sequence number is not present.
       blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
       Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      curCommitBlockMap.put(0L, new ArrayList<>());
-      curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock));
+      curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>());
+      curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock));
       // update the latest to block sequence tracker
       blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index efec05c857c..0bff4e9d206 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -168,7 +168,7 @@ public abstract class HoodieLogBlock {
    * new enums at the end.
    */
   public enum HeaderMetadataType {
-    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_SEQUENCE_NUMBER
+    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index f0ca8ef9944..d9ca8b49553 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2920,7 +2920,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber) {
     Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
     updatedHeader.putAll(header);
-    updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, 
String.valueOf(blockSequenceNumber));
+    updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, 
String.valueOf(blockSequenceNumber));
     return updatedHeader;
   }
 

Reply via email to