This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9425e5ac8f6 [HUDI-6758] Detecting and skipping Spurious log blocks 
with MOR reads (#9545)
9425e5ac8f6 is described below

commit 9425e5ac8f67397e408d393bd7f80def58454d3d
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Tue Aug 29 21:33:27 2023 -0400

    [HUDI-6758] Detecting and skipping Spurious log blocks with MOR reads 
(#9545)
    
    - Detect and skip duplicate log blocks due to task retries.
    - Detection based on block sequence number that keeps
    increasing monotonically during rollover.
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  14 +-
 .../table/log/AbstractHoodieLogRecordReader.java   | 169 ++++++++++++++++++---
 .../common/table/log/block/HoodieLogBlock.java     |   2 +-
 .../common/functional/TestHoodieLogFormat.java     | 143 +++++++++++++++--
 4 files changed, 295 insertions(+), 33 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index d0819aa8007..65f79c5147e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -129,6 +129,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private boolean useWriterSchema = false;
 
   private Properties recordProperties = new Properties();
+  // Block Sequence number will be used to detect duplicate log blocks(by log 
reader) added due to spark task retries.
+  // It should always start with 0 for a given file slice. for roll overs and 
delete blocks, we increment by 1.
+  private int blockSequenceNumber = 0;
 
   /**
    * This is used by log compaction only.
@@ -458,11 +461,11 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             ? HoodieRecord.RECORD_KEY_METADATA_FIELD
             : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
-        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
header, keyField));
+        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
getUpdatedHeader(header, blockSequenceNumber++, 
taskContextSupplier.getAttemptIdSupplier().get()), keyField));
       }
 
       if (appendDeleteBlocks && recordsToDelete.size() > 0) {
-        blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new 
DeleteRecord[0]), header));
+        blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new 
DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, 
taskContextSupplier.getAttemptIdSupplier().get())));
       }
 
       if (blocks.size() > 0) {
@@ -632,6 +635,13 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     }
   }
 
+  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber, long attemptNumber) {
+    Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
+    updatedHeader.putAll(header);
+    updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, 
String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
+    return updatedHeader;
+  }
+
   private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
                                          HoodieLogBlock.HoodieLogBlockType 
logDataBlockFormat,
                                          List<HoodieRecord> records,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 7b1e737610b..94bd68e62c4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -65,6 +66,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_SEQUENCE_NUMBER;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -108,8 +110,6 @@ public abstract class AbstractHoodieLogRecordReader {
   private final TypedProperties payloadProps;
   // Log File Paths
   protected final List<String> logFilePaths;
-  // Read Lazily flag
-  private final boolean readBlocksLazily;
   // Reverse reader - Not implemented yet (NA -> Why do we need ?)
   // but present here for plumbing for future implementation
   private final boolean reverseReader;
@@ -174,7 +174,6 @@ public abstract class AbstractHoodieLogRecordReader {
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
     this.reverseReader = reverseReader;
-    this.readBlocksLazily = readBlocksLazily;
     this.fs = fs;
     this.bufferSize = bufferSize;
     this.instantRange = instantRange;
@@ -224,6 +223,9 @@ public abstract class AbstractHoodieLogRecordReader {
 
   private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     currentInstantLogBlocks = new ArrayDeque<>();
+    List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
+    Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit = new HashMap<>();
+
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
     totalRollbacks = new AtomicLong(0);
@@ -238,7 +240,7 @@ public abstract class AbstractHoodieLogRecordReader {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
+          readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
@@ -249,6 +251,14 @@ public abstract class AbstractHoodieLogRecordReader {
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        final String blockSequenceNumberStr = 
logBlock.getLogBlockHeader().getOrDefault(BLOCK_SEQUENCE_NUMBER, "");
+        int blockSeqNo = -1;
+        long attemptNo = -1L;
+        if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
+          String[] parts = blockSequenceNumberStr.split(",");
+          attemptNo = Long.parseLong(parts[0]);
+          blockSeqNo = Integer.parseInt(parts[1]);
+        }
         totalLogBlocks.incrementAndGet();
         if (logBlock.getBlockType() != CORRUPT_BLOCK
             && 
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
 HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
@@ -271,25 +281,18 @@ public abstract class AbstractHoodieLogRecordReader {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
           case PARQUET_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant "
-                + logBlock.getLogBlockHeader().get(INSTANT_TIME));
-            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
-              // If this is an avro data block belonging to a different 
commit/instant,
-              // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-            }
+            LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant " + instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
+            validLogBlockInstants.add(logBlock);
+            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo, blockSequenceMapPerCommit);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file " + logFile.getPath());
-            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
-              // If this is a delete data block belonging to a different 
commit/instant,
-              // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-            }
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
+            validLogBlockInstants.add(logBlock);
+            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo, blockSequenceMapPerCommit);
             break;
           case COMMAND_BLOCK:
             // Consider the following scenario
@@ -334,6 +337,25 @@ public abstract class AbstractHoodieLogRecordReader {
                   return false;
                 });
 
+                // remove entire entry from blockSequenceTracker
+                blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
+
+                /// remove all matching log blocks from valid list tracked so 
far
+                validLogBlockInstants = 
validLogBlockInstants.stream().filter(block -> {
+                  // handle corrupt blocks separately since they may not have 
metadata
+                  if (block.getBlockType() == CORRUPT_BLOCK) {
+                    LOG.info("Rolling back the last corrupted log block read 
in " + logFile.getPath());
+                    return true;
+                  }
+                  if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
+                    // rollback older data block or delete block
+                    LOG.info(String.format("Rolling back an older log block 
read from %s with instantTime %s",
+                        logFile.getPath(), targetInstantForCommandBlock));
+                    return false;
+                  }
+                  return true;
+                }).collect(Collectors.toList());
+
                 final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
                 totalRollbacks.addAndGet(numBlocksRolledBack);
                 LOG.info("Number of applied rollback blocks " + 
numBlocksRolledBack);
@@ -351,6 +373,9 @@ public abstract class AbstractHoodieLogRecordReader {
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
             currentInstantLogBlocks.push(logBlock);
+            validLogBlockInstants.add(logBlock);
+            // we don't need to update the block sequence tracker here, since 
the block sequence tracker is meant to remove additional/spurious valid 
logblocks.
+            // anyway, contents of corrupt blocks are not read.
             break;
           default:
             throw new UnsupportedOperationException("Block type not supported 
yet");
@@ -358,9 +383,20 @@ public abstract class AbstractHoodieLogRecordReader {
       }
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
-        LOG.info("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
+        Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
+        if (dedupedLogBlocksInfo.getKey()) {
+          // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
+          currentInstantLogBlocks = new ArrayDeque<>();
+          dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
+          LOG.info("Merging the final data blocks");
+          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
+        } else {
+          // if there are no dups, we can take currentInstantLogBlocks as is.
+          LOG.info("Merging the final data blocks");
+          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
+        }
       }
+
       // Done
       progress = 1.0f;
     } catch (IOException e) {
@@ -381,6 +417,101 @@ public abstract class AbstractHoodieLogRecordReader {
     }
   }
 
+  /**
+   * There could be spurious log blocks due to spark task retries. So, we will 
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log 
blocks and return
+   * a deduped set of log blocks.
+   * @param allValidLogBlocks all valid log blocks parsed so far.
+   * @param blockSequenceMapPerCommit map containing block sequence numbers 
for every commit.
+   * @return a Pair of boolean and list of deduped valid block blocks, where 
boolean of true means, there have been dups detected.
+   */
+  private Pair<Boolean, List<HoodieLogBlock>> 
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
+                                                                      
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit) {
+
+    boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
+    if (dupsFound) {
+      // duplicates are found. we need to remove duplicate log blocks.
+      for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry: blockSequenceMapPerCommit.entrySet()) {
+        Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences 
= entry.getValue();
+        if (perCommitBlockSequences.size() > 1) {
+          // only those that have more than 1 sequence needs deduping.
+          int maxSequenceCount = -1;
+          int maxAttemptNo = -1;
+          int totalSequences = perCommitBlockSequences.size();
+          int counter = 0;
+          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
+            Long attemptNo = perAttemptEntries.getKey();
+            int size = perAttemptEntries.getValue().size();
+            if (maxSequenceCount < size) {
+              maxSequenceCount = size;
+              maxAttemptNo = Math.toIntExact(attemptNo);
+            }
+            counter++;
+          }
+          // for other sequence (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
+          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
+            Long attemptNo = perAttemptEntries.getKey();
+            if (maxAttemptNo != attemptNo) {
+              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(pair -> 
pair.getValue()).collect(Collectors.toList());
+              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlocksToRemove));
+            }
+          }
+        }
+      }
+      return Pair.of(true, allValidLogBlocks);
+    } else {
+      return Pair.of(false, allValidLogBlocks);
+    }
+  }
+
+  /**
+   * Updates map tracking block seq no.
+   * Here is the map structure.
+   * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit
+   * Key: Commit time.
+   * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
+   *   Value refers to a Map of different attempts for the commit of interest. 
List contains the block seq number and the resp HoodieLogBlock.
+   *
+   *  For eg, if there were two attempts for a file slice while writing(due to 
spark task retries), here is how the map might look like
+   *  key: commit1
+   *  value : {
+   *    0L = List = { {0, lb1}, {1, lb2} },
+   *    1L = List = { {0, lb3}, {1, lb4}, {2, lb5}}
+   *  }
+   *  Meaning: for commit1, there was two attempts with Append Handle while 
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 
and lb5 was added.
+   *  We keep populating this entire map and finally detect spurious log 
blocks and ignore them.
+   *  In most cases, we might just see one set of sequence for a given commit.
+   *
+   * @param logBlock log block of interest to be added.
+   * @param instantTime commit time of interest.
+   * @param blockSeqNo block sequence number.
+   * @param blockSequenceMapPerCommit map tracking per commit block sequences.
+   */
+  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNo, long attemptNo,
+                                          Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
+    if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence 
tracker for log blocks containing the same.
+      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
+      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
+      if (curCommitBlockMap.containsKey(attemptNo)) {
+        // append to existing map entry
+        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
+      } else {
+        // create a new map entry
+        curCommitBlockMap.put(attemptNo, new ArrayList<>());
+        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
+      }
+      // update the latest to block sequence tracker
+      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
+    } else {
+      // all of older blocks are considered valid. there should be only one 
list for older commits where block sequence number is not present.
+      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
+      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
+      curCommitBlockMap.put(0L, new ArrayList<>());
+      curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock));
+      // update the latest to block sequence tracker
+      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
+    }
+  }
+
   private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
@@ -397,7 +528,7 @@ public abstract class AbstractHoodieLogRecordReader {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
+          readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       /**
        * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 3ac161cbe1c..efec05c857c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -168,7 +168,7 @@ public abstract class HoodieLogBlock {
    * new enums at the end.
    */
   public enum HeaderMetadataType {
-    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS
+    INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, 
COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_SEQUENCE_NUMBER
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 9da97a07333..f0ca8ef9944 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -108,6 +108,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -528,7 +529,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieLogBlock nextBlock = reader.next();
     HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
     List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
-    assertEquals(copyOfRecords1.size(),recordsRead1.size(),
+    assertEquals(copyOfRecords1.size(), recordsRead1.size(),
         "Read records size should be equal to the written records size");
     assertEquals(copyOfRecords1, recordsRead1,
         "Both records lists should be the same. (ordering guaranteed)");
@@ -687,6 +688,108 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.close();
   }
 
+  @Test
+  public void testBasicAppendsWithBlockSeqNos() throws IOException, 
URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      return writeLogFiles(partitionPath, schema, genRecords, numFiles, 
enableBlockSeqNos);
+    });
+  }
+
+  @Test
+  public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, 
URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
+      // re add the same records again
+      logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos));
+      return logFiles;
+    });
+  }
+
+  @Test
+  public void testAppendsWithSpruiousLogBlocksFirstAttemptPartial() throws 
IOException, URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
+      // removing 4th log block to simulate partial failure in 1st attempt
+      List<HoodieLogFile> logFileList = new ArrayList<>(logFiles);
+      logFiles.remove(logFileList.get(logFileList.size() - 1));
+      // re add the same records again
+      logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos));
+      return logFiles;
+    });
+  }
+
+  @Test
+  public void testAppendsWithSpruiousLogBlocksSecondAttemptPartial() throws 
IOException, URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
+      // re add the same records again
+      Set<HoodieLogFile> logFilesSet2 = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
+      // removing 4th log block to simular partial failure in 2nd attempt
+      List<HoodieLogFile> logFileList2 = new ArrayList<>(logFilesSet2);
+      logFilesSet2.remove(logFileList2.get(logFileList2.size() - 1));
+      logFiles.addAll(logFilesSet2);
+      return logFiles;
+    });
+  }
+
+  private void testAppendsWithSpruiousLogBlocks(
+      boolean enableOptimizedLogBlocksScan,
+      Function5<Set<HoodieLogFile>, Path, Schema, List<IndexedRecord>, 
Integer, Boolean> logGenFunc)
+      throws IOException, URISyntaxException, InterruptedException {
+
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
400);
+    Set<HoodieLogFile> logFiles = logGenFunc.apply(partitionPath, schema, 
genRecords, 4, true);
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+
+    HoodieMergedLogRecordScanner scanner = getLogRecordScanner(logFiles, 
schema, enableOptimizedLogBlocksScan);
+    // even though we have duplicates records, due to block sequence 
reconcile, only one set of blocks should be parsed as valid
+    assertRecordsAndCloseScanner(scanner, genRecords, schema);
+  }
+
+  private void assertRecordsAndCloseScanner(HoodieMergedLogRecordScanner 
scanner, List<IndexedRecord> genRecords, Schema schema) throws IOException {
+    List<IndexedRecord> scannedRecords = new ArrayList<>();
+    for (HoodieRecord record : scanner) {
+      scannedRecords.add((IndexedRecord)
+          ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
+    }
+
+    assertEquals(sort(genRecords), sort(scannedRecords),
+        "Scanner records count should be the same as appended records");
+    scanner.close();
+  }
+
+  private HoodieMergedLogRecordScanner getLogRecordScanner(Set<HoodieLogFile> 
logFiles, Schema schema,
+                                                           boolean 
enableOptimizedLogBlocksScan) {
+
+    // scan all log blocks (across multiple log files)
+    return HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(
+            logFiles.stream().sorted(HoodieLogFile.getLogFileComparator())
+                .map(l -> l.getPath().toString()).collect(toList()))
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(true)
+        .withReverseReader(false)
+        .withBufferSize(BUFFER_SIZE)
+        .withSpillableMapBasePath(spillableBasePath)
+        .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
+        .withBitCaskDiskMapCompressionEnabled(true)
+        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
+        .build();
+  }
+
+  @FunctionalInterface
+  public interface Function5<R, T1, T2, T3, T4, T5> {
+
+    R apply(T1 v1, T2 v2, T3 v3, T4 v4, T5 v5) throws IOException, 
InterruptedException;
+  }
+
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
@@ -1316,7 +1419,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     scanner.forEach(s -> {
       try {
-        if 
(!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
           emptyPayloads.add(true);
         }
       } catch (IOException io) {
@@ -1422,7 +1525,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
     HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deletedKeys.stream().map(deletedKey ->
-        DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
+            DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
         .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
     writer.appendBlock(deleteBlock);
 
@@ -1443,7 +1546,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
"102");
     deleteBlock = new HoodieDeleteBlock(
         deletedKeys.stream().map(deletedKey ->
-            DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
+                DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
             .collect(Collectors.toList()).toArray(new DeleteRecord[0]), 
deleteBlockHeader);
     writer.appendBlock(deleteBlock);
 
@@ -1586,7 +1689,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.forEach(s -> readKeys.add(s.getRecordKey()));
     scanner.forEach(s -> {
       try {
-        if 
(!((HoodieRecordPayload)s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
           emptyPayloadKeys.add(s.getRecordKey());
         }
       } catch (IOException io) {
@@ -2268,7 +2371,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .sorted()
         .collect(Collectors.toList());
     List<String> validBlockInstants = scanner.getValidBlockInstants();
-    List<String> expectedBlockInstants = Arrays.asList("108","105", "104");
+    List<String> expectedBlockInstants = Arrays.asList("108", "105", "104");
     assertEquals(expectedBlockInstants, validBlockInstants);
     Collections.sort(readKeys);
     assertEquals(expectedRecords, readKeys, "Record keys read should be 
exactly same.");
@@ -2523,7 +2626,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), 
fs.getFileStatus(writer.getLogFile().getPath()).getLen());
 
     try (HoodieLogFileReader reader =
-        new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, 
readBlocksLazily, true)) {
+             new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, 
readBlocksLazily, true)) {
 
       assertTrue(reader.hasPrev(), "Last block should be available");
       HoodieLogBlock block = reader.prev();
@@ -2656,7 +2759,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     
BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter(),
 fs.getConf());
 
     // NOTE: Have to use this ugly hack since List generic is not covariant in 
its type param
-    HoodieDataBlock dataBlock = getDataBlock(dataBlockType, 
(List<IndexedRecord>)(List) records, header);
+    HoodieDataBlock dataBlock = getDataBlock(dataBlockType, 
(List<IndexedRecord>) (List) records, header);
 
     writer.appendBlock(dataBlock);
     writer.close();
@@ -2772,6 +2875,15 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                   Schema schema,
                                                   List<IndexedRecord> records,
                                                   int numFiles) throws 
IOException, InterruptedException {
+    return writeLogFiles(partitionPath, schema, records, numFiles, false);
+  }
+
+  private static Set<HoodieLogFile> writeLogFiles(Path partitionPath,
+                                                  Schema schema,
+                                                  List<IndexedRecord> records,
+                                                  int numFiles,
+                                                  boolean 
enableBlockSequenceNumbers) throws IOException, InterruptedException {
+    int blockSeqNo = 0;
     Writer writer =
         
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
             
.withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
@@ -2793,8 +2905,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       List<IndexedRecord> targetRecords = records.subList(offset, offset + 
targetRecordsCount);
 
       logFiles.add(writer.getLogFile());
+      if (enableBlockSequenceNumbers) {
+        header = getUpdatedHeader(header, blockSeqNo++);
+      }
       writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, 
header));
-
       filesWritten++;
     }
 
@@ -2803,6 +2917,13 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     return logFiles;
   }
 
+  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber) {
+    Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
+    updatedHeader.putAll(header);
+    updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, 
String.valueOf(blockSequenceNumber));
+    return updatedHeader;
+  }
+
   /**
    * Utility to convert the given iterator to a List.
    */
@@ -2860,8 +2981,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, 
boolean readBlocksLazily,
-      ExternalSpillableMap.DiskMapType diskMapType, boolean 
isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int 
expectedTotalRecords,
-      int expectedTotalKeys, Option<Set<String>> expectedKeys) throws 
IOException {
+                                     ExternalSpillableMap.DiskMapType 
diskMapType, boolean isCompressionEnabled, boolean 
enableOptimizedLogBlocksScan, int expectedTotalRecords,
+                                     int expectedTotalKeys, 
Option<Set<String>> expectedKeys) throws IOException {
     List<String> allLogFiles =
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());

Reply via email to