[ https://issues.apache.org/jira/browse/HUDI-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan updated HUDI-2841: -------------------------------------- Summary: Listing based Rollback of a failed delta commit w/ multi-writer fails (was: Rollback of a failed delta commit w/ multi-writer fails) > Listing based Rollback of a failed delta commit w/ multi-writer fails > --------------------------------------------------------------------- > > Key: HUDI-2841 > URL: https://issues.apache.org/jira/browse/HUDI-2841 > Project: Apache Hudi > Issue Type: Sub-task > Affects Versions: 0.10.0 > Reporter: sivabalan narayanan > Assignee: sivabalan narayanan > Priority: Blocker > Labels: pull-request-available > Fix For: 0.10.0 > > > In multi-writer set up, cleaner policy has to be set to lazy. And failed > rollbacks are operated upon(actually being rolledback) only by cleaner and > not eagerly. > List based rollback for MOR fails with validation that files to be rolledback > has large timestamp compared to commit being rolledback. > > Lets say, timeline is as follows: > DC1, DC2... and DC3 failed midway. > Partition1/ > baseFile1 (DC1) > baseFile2 (DC2) // due to small file handling > baseFile3 (DC3) > > Restarted deltastremaer. > which does DC4 > > Partition1/ > baseFile1 (DC1) > baseFile2 (DC2) // due to small file handling > baseFile3 (DC3) > baseFile4 (DC4) > > At the end of the commit, cleaner kicks in and tries to rollbacking any > failed commits. In RollbackUtils, where we fetch the latestFileSlice and find > all files to be deleted, we have a validation which checks that base commit > time for every file (in latest file slice) should have commit time < commit > being rolledback. In this case, baseFile4 has higher timestamp compared to > DC3 and hence validation fails. > > {code:java} > 21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]" > 21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID > 3191) > java.lang.IllegalArgumentException > at > org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) > at > org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266) > at > org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210) > at > org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134) > at > org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) > at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) > at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) > at scala.collection.AbstractIterator.to(Iterator.scala:1429) > at > scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) > at > scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) > at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) > at > scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) > at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) > at > org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : > 20211123095048787 {code} > > Code of interest > {code:java} > private static List<ListingBasedRollbackRequest> > generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant > rollbackInstant, > HoodieCommitMetadata commitMetadata, HoodieTable table) { > > ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); > // wStat.getPrevCommit() might not give the right commit time in the > following > // scenario : If a compaction was scheduled, the new commitTime associated > with the requested compaction will be > // used to write the new log files. In this case, the commit time for the > log file is the compaction requested time. > // But the index (global) might store the baseCommit of the base and not > the requested, hence get the > // baseCommit always by listing the file slice > Map<String, String> fileIdToBaseCommitTimeForLogMap = > table.getSliceView().getLatestFileSlices(partitionPath) > .collect(Collectors.toMap(FileSlice::getFileId, > FileSlice::getBaseInstantTime)); > return > commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat > -> { > // Filter out stats without prevCommit since they are all inserts > boolean validForRollback = (wStat != null) && > (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) > && (wStat.getPrevCommit() != null) && > fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); > if (validForRollback) { > // For sanity, log instant time can never be less than base-commit on > which we are rolling back > ValidationUtils > > .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), > HoodieTimeline.LESSER_THAN_OR_EQUALS, > rollbackInstant.getTimestamp())); > } > return validForRollback && > HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( > // Base Ts should be strictly less. If equal (for inserts-to-logs), > the caller employs another option > // to delete and we should not step on it > wStat.getFileId()), HoodieTimeline.LESSER_THAN, > rollbackInstant.getTimestamp()); > }).map(wStat -> { > String baseCommitTime = > fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); > return > ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, > wStat.getFileId(), > baseCommitTime); > }).collect(Collectors.toList()); > } {code} > Verified with log statements that DC4 base file is what runs into the > validation error. > > But the issue could also happen if incase a compaction had kicked in mid way > and cleaner policy was very relaxed and so rollback got triggered after 10 > commits or 1 to 2 compactions. > > In the code block of interest, we call > ` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried > to use ` > table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, > rollbackInstant.getTimestamp(), false)` > But this started including files as part of the next delta commit as well to > the list of files to be deleted. > -- This message was sent by Atlassian Jira (v8.20.1#820001)