[ 
https://issues.apache.org/jira/browse/HUDI-5034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Y Ethan Guo reassigned HUDI-5034:
---------------------------------

    Assignee: Y Ethan Guo

> Enum info lost during schema conversion
> ---------------------------------------
>
>                 Key: HUDI-5034
>                 URL: https://issues.apache.org/jira/browse/HUDI-5034
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>            Reporter: Shawn Chang
>            Assignee: Y Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.0.0
>
>
> When a transformer is used in deltastreamer sync, SparkAvroPostProcessor 
> would be attached to SchemaProvider by default (see 
> [[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java#L485]])
>  
> And in SparkAvroPostProcessor it's converting avro schema to struct type 
> schema and then convert it back immediately (see 
> [code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java#L42])
>  
>  
> But during the conversion, if the original avro schema has 'enum' field 
> specified, the field would be lost: schema would first be converted to struct 
> type schema and the 'enum' would be converted to 'string' type. And when it's 
> converted back to avro type, the 'string' type would not be converted back to 
> 'enum'.
>  
> Steps to reproduce:
>  # Prepare an avro schema that contains enum field, sample below
>  # 
> {code:java}
> {
>     "name": "accountDataRecord",
>     "namespace": "sample.test",
>     "type": "record",
>     "fields": [
>         {
>             "name": "action",
>             "type": {
>                 "name": "testEnum",
>                 "type" : "enum",
>                 "symbols": [
>                     "INSERT",
>                     "UPDATE",
>                     "DELETE"
>                 ]
>             }
>         },
>     {"name":"ts","type":"int"}
>     ]
> } {code}
>  # Run Deltastreamer with a transformer
>  # Exception: 
>  # 
> {code:java}
> Driver stacktrace:
>     at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
>     at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
>     at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
>     at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
>     at 
> org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131)
>     at 
> org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
>     at 
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109)
>     at org.apache.hudi.common.util.Option.map(Option.java:108)
>     at 
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109)
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424)
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
>     at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
>     at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
>     at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
>     at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>     at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>     at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, 
> expecting string
>     at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>     at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203)
>     at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
>     at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222)
>     at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
>     at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>     at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
>     at 
> org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298)
>     at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at 
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251)
>     at 
> org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126)
>     at 
> org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55)
>     at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 
> {code}
> Some logs I added to expose the issues:
> {code:java}
> // original schema
> 22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, 
> wrapSchemaProviderWithPostProcessor, schema provider by the end: {
>   "type" : "record",
>   "name" : "accountDataRecord",
>   "namespace" : "sample.test",
>   "fields" : [ {
>     "name" : "action",
>     "type" : {
>       "type" : "enum",
>       "name" : "testEnum",
>       "symbols" : [ "INSERT", "UPDATE", "DELETE" ]
>     }
>   }, {
>     "name" : "ts",
>     "type" : "int"
>   } ]
> }
> /* Around this LOC 
> (https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673)
>  The schema converted back lost enum field already */
> 22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source 
> schema from schema provider: {
>   "type" : "record",
>   "name" : "hoodie_source",
>   "namespace" : "hoodie.source",
>   "fields" : [ {
>     "name" : "action",
>     "type" : "string"
>   }, {
>     "name" : "ts",
>     "type" : "int"
>   } ]
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to