[ https://issues.apache.org/jira/browse/HUDI-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yue Zhang updated HUDI-5597: ---------------------------- Fix Version/s: 0.14.0 (was: 0.13.1) > Deltastreamer ingestion fails when consistent hashing index is used > ------------------------------------------------------------------- > > Key: HUDI-5597 > URL: https://issues.apache.org/jira/browse/HUDI-5597 > Project: Apache Hudi > Issue Type: Bug > Components: writer-core > Affects Versions: 0.13.0 > Reporter: sivabalan narayanan > Priority: Critical > Fix For: 0.14.0 > > > I tested consistent hashing index w/ a deltastreamer pipeline. but it failed > w/ below exception. Same pipeline works w/o any issues for default index. > > Additional configs I used > hoodie.index.type=BUCKET > hoodie.index.bucket.engine=CONSISTENT_HASHING > hoodie.bucket.index.num.buckets=4 > hoodie.compact.inline.max.delta.commits=2 > > I have some parquet data in a dir. I am starting a deltastreamer w/ > PArquetDFS source for mor table. setting the additional configs as shown > above. > I did make some minor fixes to my branch (compared to master), but thats only > to enable inline compaction w/ deltastreamer continuous mode. In general, > only async compaction is allowed w/ detlastreamer continuous. I just wanted > to test inline for now. but apart from that, I am using latest master to > test. > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 1 in stage 100.0 failed 1 times, most recent failure: Lost task 1.0 in > stage 100.0 (TID 176, localhost, executor driver): > org.apache.hudi.exception.HoodieException: Unsupported Operation Exception > at > org.apache.hudi.common.util.collection.BitCaskDiskMap.values(BitCaskDiskMap.java:303) > at > org.apache.hudi.common.util.collection.ExternalSpillableMap.values(ExternalSpillableMap.java:275) > at java.util.Collections$UnmodifiableMap.values(Collections.java:1487) > at > org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397) > at > org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:409) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:224) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:215) > at > org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64) > at > org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231) > at > org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$9cd4b1be$1(HoodieCompactor.java:129) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) > at org.apache.spark.rdd.RDD.collect(RDD.scala:989) > at > org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361) > at > org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) > at > org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155) > at > org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:101) > ... 19 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)