[ 
https://issues.apache.org/jira/browse/SPARK-33598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598165#comment-17598165
 ] 

Santokh Singh edited comment on SPARK-33598 at 8/31/22 4:27 AM:
----------------------------------------------------------------

*Facing same exception, Spark Version 3.2.2*

*Using avro mvn plugin to generate java class from below avro schema,*

 

{color:#ff0000}*Exception in thread "main" 
java.lang.UnsupportedOperationException: Cannot have circular references in 
bean class, but got the circular reference of class class 
org.apache.avro.Schema*{color}{*}{{*}}

*AVRO SCHEMA* 

[
{
"type": "record",
"namespace":"kafka.avro.schema.nested",
"name": "Address",
"fields": [

{ "name": "streetaddress", "type": "string"}

,

{"name": "city", "type": "string" }

]
},
{
"type": "record",
"name": "person",
"namespace":"kafka.avro.schema.nested",
"fields": [

{ "name": "firstname", "type": "string"}

,

{ "name": "lastname", "type": "string" }

,

{ "name": "address", "type": ["null","Address"] }

]
}
]

*-------CODE --------*

import kafka.avro.schema.nested.person;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import za.co.absa.abris.avro.functions.*;
import za.co.absa.abris.config.AbrisConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;

public class KafkaAvroStreamingAbris {

public static void main(String[] args) throws IOException, 
StreamingQueryException, TimeoutException {

SparkSession spark = SparkSession.builder()
.appName("AvroApp")
.master("local")
.getOrCreate();

Dataset df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "person")
.option("startingOffsets", "earliest")
.load();

Dataset df2 = df
.select(za.co.absa.abris.avro.functions.from_avro(
org.apache.spark.sql.functions.col("value"),
za.co.absa.abris.config.AbrisConfig
.fromConfluentAvro().downloadReaderSchemaByLatestVersion()
.andTopicNameStrategy("person",false)
.usingSchemaRegistry("http://localhost:8089";)).as("data"));

Dataset df3 = df2.map((MapFunction<Row,person>) row->

{ String rr = row.toString(); return null; }

, Encoders.bean(PersonBean.class));

StreamingQuery streamingQuery = df2
.writeStream()
.queryName("Kafka-Write")
.format("console")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(Long.parseLong("2000")))
.start();

streamingQuery.awaitTermination();

}
}


was (Author: santokhsdg):
*Facing same exception, Spark Version 3.2.2*

*Using avro mvn plugin to generate java class from below avro schema,*

 

{color:#FF0000}*Exception in thread "main" 
java.lang.UnsupportedOperationException: Cannot have circular references in 
bean class, but got the circular reference of class class 
org.apache.avro.Schema*{color}{*}{*}

*--------- AVRO SCHEMA ---------*

[
{
"type": "record",
"namespace":"kafka.avro.schema.nested",
"name": "Address",
"fields": [{
"name": "streetaddress",
"type": "string"},
{"name": "city",
"type": "string"
}]
},
{
"type": "record",
"name": "person",
"namespace":"kafka.avro.schema.nested",
"fields": [{
"name": "firstname",
"type": "string"},
{
"name": "lastname",
"type": "string"
},{
"name": "address",
"type": ["null","Address"] }]
}
]

*-------CODE --------*

import kafka.avro.schema.nested.person;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import za.co.absa.abris.avro.functions.*;
import za.co.absa.abris.config.AbrisConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;


public class KafkaAvroStreamingAbris {

public static void main(String[] args) throws IOException, 
StreamingQueryException, TimeoutException {

SparkSession spark = SparkSession.builder()
.appName("AvroApp")
.master("local")
.getOrCreate();

Dataset df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "person")
.option("startingOffsets", "earliest")
.load();

Dataset df2 = df
.select(za.co.absa.abris.avro.functions.from_avro(
org.apache.spark.sql.functions.col("value"),
za.co.absa.abris.config.AbrisConfig
.fromConfluentAvro().downloadReaderSchemaByLatestVersion()
.andTopicNameStrategy("person",false)
.usingSchemaRegistry("http://localhost:8089";)).as("data"));

Dataset df3 = df2.map((MapFunction<Row,person>) row->{
String rr = row.toString();
return null;
}, Encoders.bean(PersonBean.class));

StreamingQuery streamingQuery = df2
.writeStream()
.queryName("Kafka-Write")
.format("console")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(Long.parseLong("2000")))
.start();

streamingQuery.awaitTermination();

}
}

> Support Java Class with circular references
> -------------------------------------------
>
>                 Key: SPARK-33598
>                 URL: https://issues.apache.org/jira/browse/SPARK-33598
>             Project: Spark
>          Issue Type: Improvement
>          Components: Java API
>    Affects Versions: 3.1.2
>            Reporter: jacklzg
>            Priority: Minor
>
> If the target Java data class has a circular reference, Spark will fail fast 
> from creating the Dataset or running Encoders.
>  
> For example, with protobuf class, there is a reference with Descriptor, there 
> is no way to build a dataset from the protobuf class.
> From this line
> {color:#7a869a}Encoders.bean(ProtoBuffOuterClass.ProtoBuff.class);{color}
>  
> It will throw out immediately
>  
> {quote}Exception in thread "main" java.lang.UnsupportedOperationException: 
> Cannot have circular references in bean class, but got the circular reference 
> of class class com.google.protobuf.Descriptors$Descriptor
> {quote}
>  
> Can we add  a parameter, for example, 
>  
> {code:java}
> Encoders.bean(Class<T> clas, List<Fields> fieldsToIgnore);{code}
> ````
> or
>  
> {code:java}
> Encoders.bean(Class<T> clas, boolean skipCircularRefField);{code}
>  
>  which subsequently, instead of throwing an exception @ 
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L556],
>  it instead skip the field.
>  
> {code:java}
> if (seenTypeSet.contains(t)) {
> if(skipCircularRefField)
>   println("field skipped") //just skip this field
> else throw new UnsupportedOperationException( s"cannot have circular 
> references in class, but got the circular reference of class $t")
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to