[ 
https://issues.apache.org/jira/browse/HUDI-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-2774:
---------------------------------
    Sprint: Hudi-Sprint-Jan-3

> Async Clustering via deltstreamer fails with IllegalStateException: Duplicate 
> key [==>20211116123724586__replacecommit__INFLIGHT]
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2774
>                 URL: https://issues.apache.org/jira/browse/HUDI-2774
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: sivabalan narayanan
>            Assignee: Sagar Sumit
>            Priority: Blocker
>              Labels: core-flow-ds, pull-request-available, sev:high
>             Fix For: 0.11.0
>
>         Attachments: Screen Shot 2021-11-16 at 12.42.20 PM.png
>
>
> Setup:
> Started deltastreamer with parquet dfs source. source folder did not have any 
> data as such. Enabled async clustering with below props
> ```
> hoodie.clustering.async.max.commits=2
> hoodie.clustering.plan.strategy.sort.columns=type,id
> ```
> Added 1 file to the source folder. and deltastreamer failed during this. 
> commit went through fine. looks like 1st replace commit also went through 
> fine. but deltastreamer failed. I need to understand why deltastreamer tries 
> to schedule a 2nd replace commit as well.  It runs in continuous mode and 
> goes into next round immediately and there is no more data to sync. 
> Note: there is only one partition and one file group in the entire dataset. 
>  
> clustering plan seems to be same in both replace commit requested meta files
> {code:java}
> ^@&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
> ^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@ 
> TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A{code}
>  
> {code:java}
> ^@<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
> <A9><89>^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
> ^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@ 
> TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
> <A9><89> {code}
>  
> timeline
> !Screen Shot 2021-11-16 at 12.42.20 PM.png!
>  
> stacktrace:
> {code:java}
> 21/11/16 13:05:20 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 13:05:20 WARN DeltaSync: Extra metadata :: 20211116130512915, 
> 20211116130512915.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:23 WARN HoodieDeltaStreamer: Starting async clustering service 
> if required 111 
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for 
> instant: 20211116130526895
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827, 
> 20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for 
> instant: 20211116130527394
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827, 
> 20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:28 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 
> 176)
> java.lang.IllegalStateException: Duplicate key 
> [==>20211116130526895__replacecommit__INFLIGHT]
>       at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>       at java.util.HashMap.merge(HashMap.java:1254)
>       at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>       at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>       at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at java.util.Iterator.forEachRemaining(Iterator.java:116)
>       at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>       at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
>       at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>       at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
>       at java.util.Iterator.forEachRemaining(Iterator.java:116)
>       at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>       at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
>       at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>       at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>       at 
> org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(ClusteringUtils.java:127)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:113)
>       at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106)
>       at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:100)
>       at 
> org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:168)
>       at 
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:259)
>       at 
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:111)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:110)
>       at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:277)
>       at 
> org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy.getFileSlicesEligibleForClustering(ClusteringPlanStrategy.java:77)
>       at 
> org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy.getFileSlicesEligibleForClustering(SparkSizeBasedClusteringPlanStrategy.java:118)
>       at 
> org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$4e6aac78$1(PartitionAwareClusteringPlanStrategy.java:79)
>       at 
> org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
>       at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 21/11/16 13:05:28 WARN TaskSetManager: Lost task 0.0 in stage 74.0 (TID 176, 
> localhost, executor driver): java.lang.IllegalStateException: Duplicate key 
> [==>20211116130526895__replacecommit__INFLIGHT] {code}
>  
> I tried adding a 10 sec delay in continuous mode and things were fine. Within 
> the 10 sec delay, clustering completes and so next round does do trigger any 
> scheduling. 
> When I tried w/ 3 sec delay, ran into the same exception as above. I see 2nd 
> time scheduling happens. 
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to