[ https://issues.apache.org/jira/browse/SPARK-33598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598165#comment-17598165 ]
Santokh Singh edited comment on SPARK-33598 at 8/31/22 4:27 AM: ---------------------------------------------------------------- *Facing same exception, Spark Version 3.2.2* *Using avro mvn plugin to generate java class from below avro schema,* {color:#ff0000}*Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema*{color}{*}{{*}} *AVRO SCHEMA* [ { "type": "record", "namespace":"kafka.avro.schema.nested", "name": "Address", "fields": [ { "name": "streetaddress", "type": "string"} , {"name": "city", "type": "string" } ] }, { "type": "record", "name": "person", "namespace":"kafka.avro.schema.nested", "fields": [ { "name": "firstname", "type": "string"} , { "name": "lastname", "type": "string" } , { "name": "address", "type": ["null","Address"] } ] } ] *-------CODE --------* import kafka.avro.schema.nested.person; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import za.co.absa.abris.avro.functions.*; import za.co.absa.abris.config.AbrisConfig; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.TimeoutException; public class KafkaAvroStreamingAbris { public static void main(String[] args) throws IOException, StreamingQueryException, TimeoutException { SparkSession spark = SparkSession.builder() .appName("AvroApp") .master("local") .getOrCreate(); Dataset df = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "127.0.0.1:9092") .option("subscribe", "person") .option("startingOffsets", "earliest") .load(); Dataset df2 = df .select(za.co.absa.abris.avro.functions.from_avro( org.apache.spark.sql.functions.col("value"), za.co.absa.abris.config.AbrisConfig .fromConfluentAvro().downloadReaderSchemaByLatestVersion() .andTopicNameStrategy("person",false) .usingSchemaRegistry("http://localhost:8089")).as("data")); Dataset df3 = df2.map((MapFunction<Row,person>) row-> { String rr = row.toString(); return null; } , Encoders.bean(PersonBean.class)); StreamingQuery streamingQuery = df2 .writeStream() .queryName("Kafka-Write") .format("console") .outputMode(OutputMode.Append()) .trigger(Trigger.ProcessingTime(Long.parseLong("2000"))) .start(); streamingQuery.awaitTermination(); } } was (Author: santokhsdg): *Facing same exception, Spark Version 3.2.2* *Using avro mvn plugin to generate java class from below avro schema,* {color:#FF0000}*Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema*{color}{*}{*} *--------- AVRO SCHEMA ---------* [ { "type": "record", "namespace":"kafka.avro.schema.nested", "name": "Address", "fields": [{ "name": "streetaddress", "type": "string"}, {"name": "city", "type": "string" }] }, { "type": "record", "name": "person", "namespace":"kafka.avro.schema.nested", "fields": [{ "name": "firstname", "type": "string"}, { "name": "lastname", "type": "string" },{ "name": "address", "type": ["null","Address"] }] } ] *-------CODE --------* import kafka.avro.schema.nested.person; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import za.co.absa.abris.avro.functions.*; import za.co.absa.abris.config.AbrisConfig; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.TimeoutException; public class KafkaAvroStreamingAbris { public static void main(String[] args) throws IOException, StreamingQueryException, TimeoutException { SparkSession spark = SparkSession.builder() .appName("AvroApp") .master("local") .getOrCreate(); Dataset df = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "127.0.0.1:9092") .option("subscribe", "person") .option("startingOffsets", "earliest") .load(); Dataset df2 = df .select(za.co.absa.abris.avro.functions.from_avro( org.apache.spark.sql.functions.col("value"), za.co.absa.abris.config.AbrisConfig .fromConfluentAvro().downloadReaderSchemaByLatestVersion() .andTopicNameStrategy("person",false) .usingSchemaRegistry("http://localhost:8089")).as("data")); Dataset df3 = df2.map((MapFunction<Row,person>) row->{ String rr = row.toString(); return null; }, Encoders.bean(PersonBean.class)); StreamingQuery streamingQuery = df2 .writeStream() .queryName("Kafka-Write") .format("console") .outputMode(OutputMode.Append()) .trigger(Trigger.ProcessingTime(Long.parseLong("2000"))) .start(); streamingQuery.awaitTermination(); } } > Support Java Class with circular references > ------------------------------------------- > > Key: SPARK-33598 > URL: https://issues.apache.org/jira/browse/SPARK-33598 > Project: Spark > Issue Type: Improvement > Components: Java API > Affects Versions: 3.1.2 > Reporter: jacklzg > Priority: Minor > > If the target Java data class has a circular reference, Spark will fail fast > from creating the Dataset or running Encoders. > > For example, with protobuf class, there is a reference with Descriptor, there > is no way to build a dataset from the protobuf class. > From this line > {color:#7a869a}Encoders.bean(ProtoBuffOuterClass.ProtoBuff.class);{color} > > It will throw out immediately > > {quote}Exception in thread "main" java.lang.UnsupportedOperationException: > Cannot have circular references in bean class, but got the circular reference > of class class com.google.protobuf.Descriptors$Descriptor > {quote} > > Can we add a parameter, for example, > > {code:java} > Encoders.bean(Class<T> clas, List<Fields> fieldsToIgnore);{code} > ```` > or > > {code:java} > Encoders.bean(Class<T> clas, boolean skipCircularRefField);{code} > > which subsequently, instead of throwing an exception @ > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L556], > it instead skip the field. > > {code:java} > if (seenTypeSet.contains(t)) { > if(skipCircularRefField) > println("field skipped") //just skip this field > else throw new UnsupportedOperationException( s"cannot have circular > references in class, but got the circular reference of class $t") > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org