gengliangwang commented on code in PR #47425:
URL: https://github.com/apache/spark/pull/47425#discussion_r1717676903


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +138,110 @@ object SchemaConverters {
       case NULL => SchemaType(NullType, nullable = true)
 
       case RECORD =>
-        if (existingRecordNames.contains(avroSchema.getFullName)) {
+        val recursiveDepth: Int = 
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+        if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) {
           throw new IncompatibleSchemaException(s"""
-            |Found recursive reference in Avro schema, which can not be 
processed by Spark:
-            |${avroSchema.toString(true)}
+            |Found recursive reference in Avro schema, which can not be 
processed by Spark by
+            | default: ${avroSchema.toString(true)}. Try setting the option 
`recursiveFieldMaxDepth`
+            | to 1 - $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.
           """.stripMargin)
-        }
-        val newRecordNames = existingRecordNames + avroSchema.getFullName
-        val fields = avroSchema.getFields.asScala.map { f =>
-          val schemaType = toSqlTypeHelper(
-            f.schema(),
-            newRecordNames,
-            useStableIdForUnionType,
-            stableIdPrefixForUnionType)
-          StructField(f.name, schemaType.dataType, schemaType.nullable)
-        }
+        } else if (recursiveDepth > 0 && recursiveDepth >= 
recursiveFieldMaxDepth) {
+          logInfo(
+            log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type " 
+
+              log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} is dropped at 
recursive depth " +
+              log"${MDC(RECURSIVE_DEPTH, recursiveDepth)}."
+          )
+          null
+        } else {
+          val newRecordNames =
+            existingRecordNames + (avroSchema.getFullName -> (recursiveDepth + 
1))
+          val fields = avroSchema.getFields.asScala.map { f =>
+            val schemaType = toSqlTypeHelper(
+              f.schema(),
+              newRecordNames,
+              useStableIdForUnionType,
+              stableIdPrefixForUnionType,
+              recursiveFieldMaxDepth)
+            if (schemaType == null) {
+              null
+            }
+            else {
+              StructField(f.name, schemaType.dataType, schemaType.nullable)
+            }
+          }.filter(_ != null).toSeq

Review Comment:
   Don't we need to keep the null fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to