sbernauer opened a new issue #1845:
URL: https://github.com/apache/hudi/issues/1845


   Hi Hudi team,
   
   In 
https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-What'sHudi'sschemaevolutionstory
 you describe, that "As long as the schema passed to Hudi [...] is backwards 
compatible [...] Hudi will seamlessly handle read/write of old and new data and 
also keep the Hive schema up-to date."
   We need and try to use this mechanism but are failing.
   **Steps to reproduce:**
   1. We have some old schema and events.
   2. We update the schema with a new, optional union field and restart the 
DeltaStreamer
   3. We ingest new events
   4. We ingest old events again (there are some upserts). In production we 
would have both event versions until all producers have changed to the new 
version. At this step the Deltastream crashes with
   ```
   210545 [pool-28-thread-1] ERROR 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  - Shutting down 
delta-sync due to exception
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 22.0 failed 4 times, most recent failure: Lost task 0.3 in stage 22.0 
(TID 1162, 100.65.150.166, executor 1): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :0
           at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:253)
           at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
           at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
           at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
           at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
           at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
           at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
           at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
           at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close 
UpdateHandle
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:302)
           at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:101)
           at 
org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
           at 
org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor.handleUpdate(DeltaCommitActionExecutor.java:73)
           at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246)
           ... 28 more
   Caused by: java.io.EOFException
           at 
org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
           at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
           at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
           at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:108)
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80)
           at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:271)
           ... 32 more
   ```
   
   I've created a test to reproduce this behaviour here: 
https://github.com/apache/hudi/pull/1844. I hope the test is helpful.
   The test cause the following exception, but i think they are quite related.
   ```
   Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 16.0 (TID 23, localhost, executor 
driver): java.lang.ArrayIndexOutOfBoundsException: 19
           at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
           at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
           at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:66)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
           at scala.collection.Iterator.isEmpty(Iterator.scala:385)
           at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
           at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
           at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:44)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:801)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:801)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:123)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
           at 
org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:491)
   Caused by: java.lang.ArrayIndexOutOfBoundsException: 19
   ```
   
   I think the mailing list entry from avro could help a lot: 
http://apache-avro.679487.n3.nabble.com/AVRO-schema-evolution-adding-optional-column-with-default-fails-deserialization-td4043025.html
   The corresponding place in code is here: 
https://github.com/apache/hudi/blob/bf1d36fa639cae558aa388d8d547e58ad2e67aba/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L107
   
   
   Thanks a lot in advance!
   
   Cheers,
   Sebastian
   
   **Expected behavior**
   As stated in FAQ schema evolution should not crash the DeltaStreamer
   
   **Environment Description**
   
   * Hudi version : current master branch
   
   * Spark version : 3.0.0
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes, running on kubernetes
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to