[ 
https://issues.apache.org/jira/browse/HUDI-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Zhang updated HUDI-5597:
----------------------------
    Fix Version/s: 0.14.0
                       (was: 0.13.1)

> Deltastreamer ingestion fails when consistent hashing index is used
> -------------------------------------------------------------------
>
>                 Key: HUDI-5597
>                 URL: https://issues.apache.org/jira/browse/HUDI-5597
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: writer-core
>    Affects Versions: 0.13.0
>            Reporter: sivabalan narayanan
>            Priority: Critical
>             Fix For: 0.14.0
>
>
> I tested consistent hashing index w/ a deltastreamer pipeline. but it failed 
> w/ below exception. Same pipeline works w/o any issues for default index. 
>  
> Additional configs I used 
> hoodie.index.type=BUCKET
> hoodie.index.bucket.engine=CONSISTENT_HASHING
> hoodie.bucket.index.num.buckets=4
> hoodie.compact.inline.max.delta.commits=2
>  
> I have some parquet data in a dir. I am starting a deltastreamer w/ 
> PArquetDFS source for mor table. setting the additional configs as shown 
> above.
> I did make some minor fixes to my branch (compared to master), but thats only 
> to enable inline compaction w/ deltastreamer continuous mode. In general, 
> only async compaction is allowed w/ detlastreamer continuous. I just wanted 
> to test inline for now. but apart from that, I am using latest master to 
> test. 
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 1 in stage 100.0 failed 1 times, most recent failure: Lost task 1.0 in 
> stage 100.0 (TID 176, localhost, executor driver): 
> org.apache.hudi.exception.HoodieException: Unsupported Operation Exception
>       at 
> org.apache.hudi.common.util.collection.BitCaskDiskMap.values(BitCaskDiskMap.java:303)
>       at 
> org.apache.hudi.common.util.collection.ExternalSpillableMap.values(ExternalSpillableMap.java:275)
>       at java.util.Collections$UnmodifiableMap.values(Collections.java:1487)
>       at 
> org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
>       at 
> org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:409)
>       at 
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
>       at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:224)
>       at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:215)
>       at 
> org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64)
>       at 
> org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231)
>       at 
> org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$9cd4b1be$1(HoodieCompactor.java:129)
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
>       at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>       at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>       at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
>       at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
>       at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
>       at 
> org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
>       at 
> org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
>       at 
> org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
>       at 
> org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:101)
>       ... 19 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to