[ https://issues.apache.org/jira/browse/HUDI-5034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Y Ethan Guo reassigned HUDI-5034: --------------------------------- Assignee: Y Ethan Guo > Enum info lost during schema conversion > --------------------------------------- > > Key: HUDI-5034 > URL: https://issues.apache.org/jira/browse/HUDI-5034 > Project: Apache Hudi > Issue Type: Bug > Components: deltastreamer > Reporter: Shawn Chang > Assignee: Y Ethan Guo > Priority: Blocker > Labels: pull-request-available > Fix For: 1.0.0 > > > When a transformer is used in deltastreamer sync, SparkAvroPostProcessor > would be attached to SchemaProvider by default (see > [[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java#L485]]) > > And in SparkAvroPostProcessor it's converting avro schema to struct type > schema and then convert it back immediately (see > [code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java#L42]) > > > But during the conversion, if the original avro schema has 'enum' field > specified, the field would be lost: schema would first be converted to struct > type schema and the 'enum' would be converted to 'string' type. And when it's > converted back to avro type, the 'string' type would not be converted back to > 'enum'. > > Steps to reproduce: > # Prepare an avro schema that contains enum field, sample below > # > {code:java} > { > "name": "accountDataRecord", > "namespace": "sample.test", > "type": "record", > "fields": [ > { > "name": "action", > "type": { > "name": "testEnum", > "type" : "enum", > "symbols": [ > "INSERT", > "UPDATE", > "DELETE" > ] > } > }, > {"name":"ts","type":"int"} > ] > } {code} > # Run Deltastreamer with a transformer > # Exception: > # > {code:java} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255) > at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.take(RDD.scala:1422) > at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) > at > org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131) > at > org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala) > at > org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109) > at org.apache.hudi.common.util.Option.map(Option.java:108) > at > org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200) > at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, > expecting string > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) > at > org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203) > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) > at > org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142) > at > org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251) > at > org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126) > at > org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > {code} > Some logs I added to expose the issues: > {code:java} > // original schema > 22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, > wrapSchemaProviderWithPostProcessor, schema provider by the end: { > "type" : "record", > "name" : "accountDataRecord", > "namespace" : "sample.test", > "fields" : [ { > "name" : "action", > "type" : { > "type" : "enum", > "name" : "testEnum", > "symbols" : [ "INSERT", "UPDATE", "DELETE" ] > } > }, { > "name" : "ts", > "type" : "int" > } ] > } > /* Around this LOC > (https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673) > The schema converted back lost enum field already */ > 22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source > schema from schema provider: { > "type" : "record", > "name" : "hoodie_source", > "namespace" : "hoodie.source", > "fields" : [ { > "name" : "action", > "type" : "string" > }, { > "name" : "ts", > "type" : "int" > } ] > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)