WweiL commented on code in PR #47425:
URL: https://github.com/apache/spark/pull/47425#discussion_r1689265340


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure 
the prefix for fields
   // of Avro Union type.
   val STABLE_ID_PREFIX_FOR_UNION_TYPE = 
newOption("stableIdentifierPrefixForUnionType")
+
+  /**
+   * Adds support for recursive fields. If this option is not specified or is 
set to 0, recursive
+   * fields are not permitted. Setting it to 1 drops all recursive fields, 2 
allows recursive
+   * fields to be recursed once, and 3 allows it to be recursed twice and so 
on, up to 15.
+   * Values larger than 15 are not allowed in order avoid inadvertently 
creating very large schemas.

Review Comment:
   In order to?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -2252,6 +2252,151 @@ abstract class AvroSuite
     """.stripMargin)
   }
 
+  private def checkSparkSchemaEquals(
+      avroSchema: String, expectedSchema: StructType, recursiveFieldMaxDepth: 
Int): Unit = {
+    val sparkSchema =
+      SchemaConverters.toSqlType(
+        new Schema.Parser().parse(avroSchema), false, "", 
recursiveFieldMaxDepth).dataType
+
+    assert(sparkSchema === expectedSchema)
+  }
+
+  test("Translate recursive schema - 1") {
+    val avroSchema = """
+      |{
+      |  "type": "record",
+      |  "name": "LongList",
+      |  "fields" : [
+      |    {"name": "value", "type": "long"},             // each element has 
a long
+      |    {"name": "next", "type": ["null", "LongList"]} // optional next 
element
+      |  ]
+      |}
+    """.stripMargin
+    val nonRecursiveFields = new StructType().add("value", LongType, nullable 
= false)
+    var expectedSchema = nonRecursiveFields
+    for (i <- 1 to 5) {
+      checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+      expectedSchema = nonRecursiveFields.add("next", expectedSchema)
+    }
+  }
+
+  test("Translate recursive schema - 2") {
+    val avroSchema = """
+      |{
+      |  "type": "record",
+      |  "name": "LongList",
+      |  "fields": [
+      |    {
+      |      "name": "value",
+      |      "type": {
+      |        "type": "record",
+      |        "name": "foo",
+      |        "fields": [
+      |          {
+      |            "name": "parent",
+      |            "type": "LongList"
+      |          }
+      |        ]
+      |      }
+      |    }
+      |  ]
+      |}
+    """.stripMargin
+    val nonRecursiveFields = new StructType().add("value", StructType(Seq()), 
nullable = false)
+    var expectedSchema = nonRecursiveFields
+    for (i <- 1 to 5) {
+      checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+      expectedSchema = new StructType().add("value",
+          new StructType().add("parent", expectedSchema, nullable = false), 
nullable = false)
+    }
+  }
+
+  test("Translate recursive schema - 3") {
+    val avroSchema = """
+      |{
+      |  "type": "record",
+      |  "name": "LongList",
+      |  "fields" : [
+      |    {"name": "value", "type": "long"},
+      |    {"name": "array", "type": {"type": "array", "items": "LongList"}}
+      |  ]
+      |}
+    """.stripMargin
+    val nonRecursiveFields = new StructType().add("value", LongType, nullable 
= false)
+    var expectedSchema = nonRecursiveFields
+    for (i <- 1 to 5) {
+      checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+      expectedSchema =
+        nonRecursiveFields.add("array", new ArrayType(expectedSchema, false), 
nullable = false)
+    }
+  }
+
+  test("Translate recursive schema - 4") {

Review Comment:
   Can we have better names for the test?



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +135,109 @@ object SchemaConverters {
       case NULL => SchemaType(NullType, nullable = true)
 
       case RECORD =>
-        if (existingRecordNames.contains(avroSchema.getFullName)) {
+        val recursiveDepth: Int = 
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+        if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || 
recursiveFieldMaxDepth > 15)) {
           throw new IncompatibleSchemaException(s"""
-            |Found recursive reference in Avro schema, which can not be 
processed by Spark:
-            |${avroSchema.toString(true)}
+            |Found recursive reference in Avro schema, which can not be 
processed by Spark by

Review Comment:
   IIRC protobuf does similar here. But this logic looks a bit weird. If we do 
want to limit the max recursive depth, I feel that it should be checked in the 
option and throw `IllegalArgumentException`



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +135,109 @@ object SchemaConverters {
       case NULL => SchemaType(NullType, nullable = true)
 
       case RECORD =>
-        if (existingRecordNames.contains(avroSchema.getFullName)) {
+        val recursiveDepth: Int = 
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+        if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 || 
recursiveFieldMaxDepth > 15)) {
           throw new IncompatibleSchemaException(s"""
-            |Found recursive reference in Avro schema, which can not be 
processed by Spark:
-            |${avroSchema.toString(true)}
+            |Found recursive reference in Avro schema, which can not be 
processed by Spark by
+            | default: ${avroSchema.toString(true)}. Try setting the option 
`recursiveFieldMaxDepth`
+            | to 1 - 15. Going beyond 15 levels of recursion is not allowed.
           """.stripMargin)
-        }
-        val newRecordNames = existingRecordNames + avroSchema.getFullName
-        val fields = avroSchema.getFields.asScala.map { f =>
-          val schemaType = toSqlTypeHelper(
-            f.schema(),
-            newRecordNames,
-            useStableIdForUnionType,
-            stableIdPrefixForUnionType)
-          StructField(f.name, schemaType.dataType, schemaType.nullable)
-        }
+        } else if (recursiveDepth > 0 && recursiveDepth >= 
recursiveFieldMaxDepth) {
+          log.info(

Review Comment:
   Please try to use the new MDC logging, example: 
https://github.com/apache/spark/pull/46192



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure 
the prefix for fields
   // of Avro Union type.
   val STABLE_ID_PREFIX_FOR_UNION_TYPE = 
newOption("stableIdentifierPrefixForUnionType")
+
+  /**
+   * Adds support for recursive fields. If this option is not specified or is 
set to 0, recursive
+   * fields are not permitted. Setting it to 1 drops all recursive fields, 2 
allows recursive
+   * fields to be recursed once, and 3 allows it to be recursed twice and so 
on, up to 15.
+   * Values larger than 15 are not allowed in order avoid inadvertently 
creating very large schemas.

Review Comment:
   I feel this should be a spark conf



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure 
the prefix for fields
   // of Avro Union type.
   val STABLE_ID_PREFIX_FOR_UNION_TYPE = 
newOption("stableIdentifierPrefixForUnionType")
+
+  /**
+   * Adds support for recursive fields. If this option is not specified or is 
set to 0, recursive
+   * fields are not permitted. Setting it to 1 drops all recursive fields, 2 
allows recursive
+   * fields to be recursed once, and 3 allows it to be recursed twice and so 
on, up to 15.
+   * Values larger than 15 are not allowed in order avoid inadvertently 
creating very large schemas.

Review Comment:
   Does protobuf also have max depth of 15?



##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala:
##########
@@ -228,7 +228,8 @@ object AvroSerdeSuite {
         RebaseSpec(CORRECTED),
         new NoopFilters,
         false,
-        "")
+        "",
+        -1)

Review Comment:
   Should we just add a default value in the definition to prevent multiple API 
change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to