[ 
https://issues.apache.org/jira/browse/HUDI-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2841:
--------------------------------------
    Summary: Listing based Rollback of a failed delta commit w/ multi-writer 
fails  (was: Rollback of a failed delta commit w/ multi-writer fails)

> Listing based Rollback of a failed delta commit w/ multi-writer fails
> ---------------------------------------------------------------------
>
>                 Key: HUDI-2841
>                 URL: https://issues.apache.org/jira/browse/HUDI-2841
>             Project: Apache Hudi
>          Issue Type: Sub-task
>    Affects Versions: 0.10.0
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> In multi-writer set up, cleaner policy has to be set to lazy. And failed 
> rollbacks are operated upon(actually being rolledback) only by cleaner and 
> not eagerly.
> List based rollback for MOR fails with validation that files to be rolledback 
> has large timestamp compared to commit being rolledback. 
>  
> Lets say, timeline is as follows:
> DC1, DC2... and DC3 failed midway.
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>  
> Restarted deltastremaer. 
> which does DC4
>  
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>   baseFile4 (DC4) 
>  
> At the end of the commit, cleaner kicks in and tries to rollbacking any 
> failed commits. In RollbackUtils, where we fetch the latestFileSlice and find 
> all files to be deleted, we have a validation which checks that base commit 
> time for every file (in latest file slice) should have commit time < commit 
> being rolledback. In this case, baseFile4 has higher timestamp compared to 
> DC3 and hence validation fails. 
>  
> {code:java}
> 21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]"
> 21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID 
> 3191)
> java.lang.IllegalArgumentException
>         at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254)
>         at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
>         at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210)
>         at 
> org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
>         at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>         at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>         at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
>         at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1429)
>         at 
> scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
>         at 
> scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
>         at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
>         at 
> scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
>         at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
>         at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : 
> 20211123095048787 {code}
>  
> Code of interest
> {code:java}
> private static List<ListingBasedRollbackRequest> 
> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant 
> rollbackInstant,
>     HoodieCommitMetadata commitMetadata, HoodieTable table) {
>   
> ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
>   // wStat.getPrevCommit() might not give the right commit time in the 
> following
>   // scenario : If a compaction was scheduled, the new commitTime associated 
> with the requested compaction will be
>   // used to write the new log files. In this case, the commit time for the 
> log file is the compaction requested time.
>   // But the index (global) might store the baseCommit of the base and not 
> the requested, hence get the
>   // baseCommit always by listing the file slice
>   Map<String, String> fileIdToBaseCommitTimeForLogMap = 
> table.getSliceView().getLatestFileSlices(partitionPath)
>       .collect(Collectors.toMap(FileSlice::getFileId, 
> FileSlice::getBaseInstantTime));
>   return 
> commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat
>  -> {
>     // Filter out stats without prevCommit since they are all inserts
>     boolean validForRollback = (wStat != null) && 
> (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
>         && (wStat.getPrevCommit() != null) && 
> fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
>     if (validForRollback) {
>       // For sanity, log instant time can never be less than base-commit on 
> which we are rolling back
>       ValidationUtils
>           
> .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
>               HoodieTimeline.LESSER_THAN_OR_EQUALS, 
> rollbackInstant.getTimestamp()));
>     }
>     return validForRollback && 
> HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
>         // Base Ts should be strictly less. If equal (for inserts-to-logs), 
> the caller employs another option
>         // to delete and we should not step on it
>         wStat.getFileId()), HoodieTimeline.LESSER_THAN, 
> rollbackInstant.getTimestamp());
>   }).map(wStat -> {
>     String baseCommitTime = 
> fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
>     return 
> ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
>  wStat.getFileId(),
>         baseCommitTime);
>   }).collect(Collectors.toList());
> } {code}
> Verified with log statements that DC4 base file is what runs into the 
> validation error. 
>  
> But the issue could also happen if incase a compaction had kicked in mid way 
> and cleaner policy was very relaxed and so rollback got triggered after 10 
> commits or 1 to 2 compactions. 
>  
> In the code block of interest, we call 
> ` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried 
> to use `
> table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
> rollbackInstant.getTimestamp(), false)`
> But this started including files as part of the next delta commit as well to 
> the list of files to be deleted. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to