[ https://issues.apache.org/jira/browse/HUDI-4588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Kudinkin updated HUDI-4588: ---------------------------------- Sprint: 2022/08/08, 2022/08/22, 2022/09/05, 2022/10/04 (was: 2022/08/08, 2022/08/22, 2022/09/05) > Ingestion failing if source column is dropped > --------------------------------------------- > > Key: HUDI-4588 > URL: https://issues.apache.org/jira/browse/HUDI-4588 > Project: Apache Hudi > Issue Type: Bug > Components: deltastreamer > Reporter: Vamshi Gudavarthi > Assignee: Alexey Kudinkin > Priority: Blocker > Labels: pull-request-available, schema, schema-evolution > Fix For: 0.12.1 > > Attachments: schema_stage1.avsc, schema_stage2.avsc, stage_1.json, > stage_2.json > > > Ingestion using Deltastreamer fails if columns are dropped from source. I had > reproduced using docker-demo setup. Below are the steps for reproducing it. > # I had created data file `stage_1.json`(attached), ingested it to kafka and > ingested to hudi-table from kafka using Deltastreamer job(using > FileschemaProvider with `schema_stage1.avsc`) > # Simulating column dropping from source in the next step. > # Repeat steps in step1 with stage2 files. Stage2 files doesn't have `day` > column, Ingestion job failed. Below is detailed stacktrace. > {code:java} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158) > at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at org.apache.spark.rdd.RDD.fold(RDD.scala:1092) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at > org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34) > at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:607) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201) > at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting > bucketType UPDATE for partition :0 > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieException: operation has failed > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) > ... 30 more > Caused by: org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieException: operation has failed > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) > at > org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) > ... 33 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.hudi.exception.HoodieException: operation has failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) > ... 34 more > Caused by: org.apache.hudi.exception.HoodieException: operation has failed > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: org.apache.hudi.exception.HoodieException: unable to read next > record from parquet file > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) > at > org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema > mismatch: Avro field 'day' not found > at > org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) > at > org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) > at > org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) > ... 8 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)