[ https://issues.apache.org/jira/browse/HUDI-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Sumit updated HUDI-1602: ------------------------------ Story Points: 1 (was: 0.5) > Corrupted Avro schema extracted from parquet file > ------------------------------------------------- > > Key: HUDI-1602 > URL: https://issues.apache.org/jira/browse/HUDI-1602 > Project: Apache Hudi > Issue Type: Bug > Affects Versions: 0.9.0 > Reporter: Alexander Filipchik > Assignee: Sagar Sumit > Priority: Blocker > Labels: core-flow-ds, pull-request-available, sev:critical > Fix For: 0.11.0 > > > we are running a HUDI deltastreamer on a very complex stream. Schema is > deeply nested, with several levels of hierarchy (avro schema is around 6600 > LOC). > > The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently > started attempts to upgrade to the latest. Hovewer, latest HUDI can't read > the provided dataset. Exception I get: > > > {code:java} > Got exception while parsing the arguments:Got exception while parsing the > arguments:Found recursive reference in Avro schema, which can not be > processed by Spark:{ "type" : "record", "name" : "array", "fields" : [ { > "name" : "id", "type" : [ "null", "string" ], "default" : null }, { > "name" : "type", "type" : [ "null", "string" ], "default" : null }, { > "name" : "exist", "type" : [ "null", "boolean" ], "default" : null > } ]} Stack > trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive > reference in Avro schema, which can not be processed by Spark:{ "type" : > "record", "name" : "array", "fields" : [ { "name" : "id", "type" : [ > "null", "string" ], "default" : null }, { "name" : "type", "type" : > [ "null", "string" ], "default" : null }, { "name" : "exist", > "type" : [ "null", "boolean" ], "default" : null } ]} > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) > at > org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) > at > org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) > at > org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) > at > org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at > org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at > com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at > com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at > org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at > org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at > org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > > I wrote a simple test that opens parquet file, loads schema, and attempts to > convert it into avro and it does fail with the same error. It appears that > Avro schema that looked like: > > {noformat} > { > "name": "entity_path", > "type": [ > "null", > { > "type": "record", > "name": "MenuEntityPath", > "fields": [ > { > "name": "path_nodes", > "type": [ > "null", > { > "type": "array", > "items": { > "type": "record", > "name": "PathNode", > "namespace": "Menue_pathPath$", > "fields": [ > { > "name": "id", > "type": [ > "null", > { > "type": "string", > "avro.java.string": "String" > } > ], > "default": null > }, > { > "name": "type", > "type": [ > "null", > { > "type": "enum", > "name": "MenuEntityType", > "namespace": "shared", > "symbols": [ > "UNKNOWN" > ] > } > ], > "default": null > } > ] > } > } > ], > "default": null > } > ] > } > ], > "default": null > } > ] > } > ], > "default": null > },{noformat} > Is converted into: > {noformat} > [ > "null", > { > "type": "record", > "name": "entity_path", > "fields": [ > { > "name": "path_nodes", > "type": [ > "null", > { > "type": "array", > "items": { > "type": "record", > "name": "array", > "fields": [ > { > "name": "id", > "type": [ > "null", > "string" > ], > "default": null > }, > { > "name": "type", > "type": [ > "null", > "string" > ], > "default": null > }, > { > "name": "exist", > "type": [ > "null", > "boolean" > ], > "default": null > } > ] > } > } > ], > "default": null > }, > { > "name": "exist", > "type": [ > "null", > "boolean" > ], > "default": null > } > ] > } > ]{noformat} > A couple of questions: did anyone have similar issues and what is the best > way forward? > > Edit: > I converted the dataset into pure parquet by using presto as an intermediary > (create table as select). The result fails with a similar error, but in the > different place: > > {noformat} > Found recursive reference in Avro schema, which can not be processed by Spark: > { > "type" : "record", > "name" : "bag", > "fields" : [ { > "name" : "array_element", > "type" : [ "null", { > "type" : "record", > "name" : "array_element", > "fields" : [ { > "name" : "id",{noformat} > it looks like the parquet writer replaces arrays with some synthetic records > and gives them the same name. > > Also, Spark reader works. I can open the parquet file directly by using: > {noformat} > Dataset dataset = spark.read().parquet() {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)