Hi Ethan, It is often tricky to debug or help with issues when I do not have an idea of the data. My "guess" is that your schema is changing. This could be related to: https://stackoverflow.com/a/42946528/4517001 (https://link.getmailspring.com/link/966cdc69-f6ce-4745-88bd-0e5553efa...@getmailspring.com/0?redirect=https%3A%2F%2Fstackoverflow.com%2Fa%2F42946528%2F4517001&recipient=ZGV2QGh1ZGkuYXBhY2hlLm9yZw%3D%3D). It may help if you could send sample data of the first pass that went without issues and the second set of data that caused an issue. Please check that you are allowed to share the data. Thanks Kabeer.
On Dec 19 2019, at 7:33 pm, Y Ethan Guo <ethan.guoyi...@gmail.com> wrote: > Hi folks, > > I'm testing a new Deltastreamer job in cluster which incrementally pulls > data from an upstream Hudi table and upserts the dataset into another > table. The first run of Deltastreamer job which involves only inserts > succeeded. The second run of the job which involves updates throws the > following exception. I'm using a snapshot build of Hudi: > 0.4.8-SNAPSHOT[1]. I believe this is related to schema, but I don't know > how I should debug and fix this. Any suggestion is appreciated. > > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 33 in stage 78.0 failed 4 times, most recent failure: Lost task > 33.3 in stage 78.0 (TID 13283, executor 5): > com.uber.hoodie.exception.HoodieUpsertException: Error upserting > bucketType UPDATE for partition :33 > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:271) > at > com.uber.hoodie.HoodieWriteClient.lambda$upsertRecordsInternal$7ef77fd$1(HoodieWriteClient.java:442) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.uber.hoodie.exception.HoodieException: > com.uber.hoodie.exception.HoodieException: > java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:206) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:181) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:263) > ... 28 more > Caused by: com.uber.hoodie.exception.HoodieException: > java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:146) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:204) > ... 30 more > Caused by: java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:144) > ... 31 more > Caused by: com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:232) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:211) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:49) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:262) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:124) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: java.lang.ClassCastException: required binary key (UTF8) is > not a group > at org.apache.parquet.schema.Type.asGroupType(Type.java:202) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter.<init>(AvroRecordConverter.java:821) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:210) > at > org.apache.parquet.avro.AvroRecordConverter.access$200(AvroRecordConverter.java:63) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:435) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:385) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:206) > at > org.apache.parquet.avro.AvroRecordConverter.access$200(AvroRecordConverter.java:63) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter$MapKeyValueConverter.<init>(AvroRecordConverter.java:872) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter.<init>(AvroRecordConverter.java:822) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:210) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:112) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:79) > at > org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175) > at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125) > at > com.uber.hoodie.func.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:45) > at > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:44) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) > at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1089) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.fold(RDD.scala:1083) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34) > at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165) > at > com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:279) > ... 47 elided > Caused by: com.uber.hoodie.exception.HoodieUpsertException: Error > upserting bucketType UPDATE for partition :33 > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:271) > at > com.uber.hoodie.HoodieWriteClient.lambda$upsertRecordsInternal$7ef77fd$1(HoodieWriteClient.java:442) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) > at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > ... 3 more > Caused by: com.uber.hoodie.exception.HoodieException: > com.uber.hoodie.exception.HoodieException: > java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:206) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:181) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:263) > ... 28 more > Caused by: com.uber.hoodie.exception.HoodieException: > java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:146) > at > com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:204) > ... 30 more > Caused by: java.util.concurrent.ExecutionException: > com.uber.hoodie.exception.HoodieException: operation has failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:144) > ... 31 more > Caused by: com.uber.hoodie.exception.HoodieException: operation has failed > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:232) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:211) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:49) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:262) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:124) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: java.lang.ClassCastException: required binary key (UTF8) is > not a group > at org.apache.parquet.schema.Type.asGroupType(Type.java:202) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter.<init>(AvroRecordConverter.java:821) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:210) > at > org.apache.parquet.avro.AvroRecordConverter.access$200(AvroRecordConverter.java:63) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:435) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:385) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:206) > at > org.apache.parquet.avro.AvroRecordConverter.access$200(AvroRecordConverter.java:63) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter$MapKeyValueConverter.<init>(AvroRecordConverter.java:872) > at > org.apache.parquet.avro.AvroRecordConverter$MapConverter.<init>(AvroRecordConverter.java:822) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:210) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:112) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:79) > at > org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175) > at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125) > at > com.uber.hoodie.func.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:45) > at > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:44) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > > > [1] > https://oss.sonatype.org/content/repositories/snapshots/com/uber/hoodie/hoodie-utilities-bundle/0.4.8-SNAPSHOT/hoodie-utilities-bundle-0.4.8-20190531.060546-1.jar > > Thanks, > - Ethan >