This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f2afb88d42 [SPARK-43333][SQL] Allow Avro to convert union type to SQL 
with field name stable with type
8f2afb88d42 is described below

commit 8f2afb88d42af04f36c84972d9ebcb5dabc91260
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Wed May 31 13:32:02 2023 -0700

    [SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name 
stable with type
    
    ### What changes were proposed in this pull request?
    Introduce AvroOption "enableStableIdentifiersForUnionType". If it is set to 
true (default remains to be false), Avro's union is converted to SQL schema by 
naming field name "member_" + type name. This is to try to keep field name 
stable with type name.
    
    ### Why are the changes needed?
    The purpose of this is twofold:
    
    To allow adding or removing types to the union without affecting the record 
names of other member types. If the new or removed type is not ordered last, 
then existing queries referencing "member2" may need to be rewritten to 
reference "member1" or "member3".
    Referencing the type name in the query is more readable than referencing 
"member0".
    For example, our system produces an avro schema from a Java type structure 
where subtyping maps to union types whose members are ordered 
lexicographically. Adding a subtype can therefore easily result in all 
references to "member2" needing to be updated to "member3".
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add a unit test that covers all types supported in union, as well as some 
potential name conflict cases.
    
    Closes #41263 from siying/avro_stable_union.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/sql/avro/AvroDataToCatalyst.scala |   2 +-
 .../org/apache/spark/sql/avro/AvroOptions.scala    |  10 +
 .../org/apache/spark/sql/avro/AvroUtils.scala      |   2 +-
 .../apache/spark/sql/avro/SchemaConverters.scala   |  62 ++++--
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala |   9 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 222 ++++++++++++++++++---
 docs/sql-data-sources-avro.md                      |   8 +-
 7 files changed, 274 insertions(+), 41 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index f8718edd97f..59f2999bdd3 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -39,7 +39,7 @@ private[sql] case class AvroDataToCatalyst(
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
   override lazy val dataType: DataType = {
-    val dt = SchemaConverters.toSqlType(expectedSchema).dataType
+    val dt = SchemaConverters.toSqlType(expectedSchema, options).dataType
     parseMode match {
       // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
       // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 95001bb8150..c8057ca5879 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -130,6 +130,9 @@ private[sql] class AvroOptions(
   val datetimeRebaseModeInRead: String = parameters
     .get(DATETIME_REBASE_MODE)
     .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
+
+  val useStableIdForUnionType: Boolean =
+    parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false)
 }
 
 private[sql] object AvroOptions extends DataSourceOptions {
@@ -154,4 +157,11 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // datasource similarly to the SQL config 
`spark.sql.avro.datetimeRebaseModeInRead`,
   // and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
   val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode")
+  // If it is set to true, Avro schema is deserialized into Spark SQL schema, 
and the Avro Union
+  // type is transformed into a structure where the field names remain 
consistent with their
+  // respective types. The resulting field names are converted to lowercase, 
e.g. member_int or
+  // member_string. If two user-defined type names or a user-defined type name 
and a built-in
+  // type name are identical regardless of case, an exception will be raised. 
However, in other
+  // cases, the field names can be uniquely identified.
+  val STABLE_ID_FOR_UNION_TYPE = 
newOption("enableStableIdentifiersForUnionType")
 }
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index e1966bd1041..2554106d78e 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -61,7 +61,7 @@ private[sql] object AvroUtils extends Logging {
           new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
       }
 
-    SchemaConverters.toSqlType(avroSchema).dataType match {
+    SchemaConverters.toSqlType(avroSchema, options).dataType match {
       case t: StructType => Some(t)
       case _ => throw new RuntimeException(
         s"""Avro schema cannot be converted to a Spark SQL StructType:
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index f616cfa9b5d..e2e2739e7cf 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.avro
 
+import java.util.Locale
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.LogicalTypes.{Date, Decimal, LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
@@ -49,13 +52,19 @@ object SchemaConverters {
    * @since 2.4.0
    */
   def toSqlType(avroSchema: Schema): SchemaType = {
-    toSqlTypeHelper(avroSchema, Set.empty)
+    toSqlTypeHelper(avroSchema, Set.empty, AvroOptions(Map()))
+  }
+  def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType 
= {
+    toSqlTypeHelper(avroSchema, Set.empty, AvroOptions(options))
   }
 
   // The property specifies Catalyst type of the given field
   private val CATALYST_TYPE_PROP_NAME = "spark.sql.catalyst.type"
 
-  private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: 
Set[String]): SchemaType = {
+  private def toSqlTypeHelper(
+      avroSchema: Schema,
+      existingRecordNames: Set[String],
+      avroOptions: AvroOptions): SchemaType = {
     avroSchema.getType match {
       case INT => avroSchema.getLogicalType match {
         case _: Date => SchemaType(DateType, nullable = false)
@@ -106,20 +115,23 @@ object SchemaConverters {
         }
         val newRecordNames = existingRecordNames + avroSchema.getFullName
         val fields = avroSchema.getFields.asScala.map { f =>
-          val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
+          val schemaType = toSqlTypeHelper(f.schema(), newRecordNames, 
avroOptions)
           StructField(f.name, schemaType.dataType, schemaType.nullable)
         }
 
         SchemaType(StructType(fields.toArray), nullable = false)
 
       case ARRAY =>
-        val schemaType = toSqlTypeHelper(avroSchema.getElementType, 
existingRecordNames)
+        val schemaType = toSqlTypeHelper(
+          avroSchema.getElementType,
+          existingRecordNames,
+          avroOptions)
         SchemaType(
           ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
           nullable = false)
 
       case MAP =>
-        val schemaType = toSqlTypeHelper(avroSchema.getValueType, 
existingRecordNames)
+        val schemaType = toSqlTypeHelper(avroSchema.getValueType, 
existingRecordNames, avroOptions)
         SchemaType(
           MapType(StringType, schemaType.dataType, valueContainsNull = 
schemaType.nullable),
           nullable = false)
@@ -129,26 +141,50 @@ object SchemaConverters {
           // In case of a union with null, eliminate it and make a recursive 
call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
           if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, 
existingRecordNames).copy(nullable = true)
-          } else {
-            toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), 
existingRecordNames)
+            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, 
avroOptions)
               .copy(nullable = true)
+          } else {
+            toSqlTypeHelper(
+              Schema.createUnion(remainingUnionTypes.asJava),
+              existingRecordNames,
+              avroOptions).copy(nullable = true)
           }
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
-            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
+            toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames, 
avroOptions)
           case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
             SchemaType(LongType, nullable = false)
           case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
             SchemaType(DoubleType, nullable = false)
           case _ =>
-            // Convert complex unions to struct types where field names are 
member0, member1, etc.
-            // This is consistent with the behavior when converting between 
Avro and Parquet.
+            // When avroOptions.useStableIdForUnionType is false, convert 
complex unions to struct
+            // types where field names are member0, member1, etc. This is 
consistent with the
+            // behavior when converting between Avro and Parquet.
+            // If avroOptions.useStableIdForUnionType is true, include type 
name in field names
+            // so that users can drop or add fields and keep field name stable.
+            val fieldNameSet : mutable.Set[String] = mutable.Set()
             val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
               case (s, i) =>
-                val schemaType = toSqlTypeHelper(s, existingRecordNames)
+                val schemaType = toSqlTypeHelper(s, existingRecordNames, 
avroOptions)
+
+                val fieldName = if (avroOptions.useStableIdForUnionType) {
+                  // Avro's field name may be case sensitive, so field names 
for two named type
+                  // could be "a" and "A" and we need to distinguish them. In 
this case, we throw
+                  // an exception.
+                  val temp_name = 
s"member_${s.getName.toLowerCase(Locale.ROOT)}"
+                  if (fieldNameSet.contains(temp_name)) {
+                    throw new IncompatibleSchemaException(
+                      "Cannot generate stable indentifier for Avro union type 
due to name " +
+                      s"conflict of type name ${s.getName}")
+                  }
+                  fieldNameSet.add(temp_name)
+                  temp_name
+                } else {
+                  s"member$i"
+                }
+
                 // All fields are nullable because only one of them is set at 
a time
-                StructField(s"member$i", schemaType.dataType, nullable = true)
+                StructField(fieldName, schemaType.dataType, nullable = true)
             }
 
             SchemaType(StructType(fields.toArray), nullable = false)
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 7c79162e896..62f61da75b5 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -260,8 +260,13 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSparkSession {
       |  ]
       |}
     """.stripMargin
-    val avroSchema = AvroOptions(Map("avroSchema" -> 
avroTypeStruct)).schema.get
-    val sparkSchema = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val options = Map("avroSchema" -> avroTypeStruct)
+    val avroOptions = AvroOptions(options)
+    val avroSchema = avroOptions.schema.get
+    val sparkSchema = SchemaConverters
+      .toSqlType(avroSchema, options)
+      .dataType
+      .asInstanceOf[StructType]
 
     val df = spark.range(5).select($"id")
     val structDf = df.select(struct($"id").as("struct"))
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a09cbf53ab9..97260e6eea6 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -98,6 +98,53 @@ abstract class AvroSuite
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  // Check whether an Avro schema of union type is converted to SQL in an 
expected way, when the
+  // stable ID option is on.
+  //
+  // @param types           Avro types that contain in an Avro union type
+  // @param expectedSchema  expeted SQL schema, provided in DDL string form
+  // @param fieldsAndRow    A list of rows to be appended to the Avro file and 
the expected
+  // converted SQL rows
+  private def checkUnionStableId(
+      types: List[Schema],
+      expectedSchema: String,
+      fieldsAndRow: Seq[(Any, Row)]): Unit = {
+    withTempDir { dir =>
+      val unionType = Schema.createUnion(
+        types.asJava
+      )
+      val fields =
+        Seq(new Field("field1", unionType, "doc", 
null.asInstanceOf[AnyVal])).asJava
+      val schema = Schema.createRecord("name", "docs", "namespace", false)
+      schema.setFields(fields)
+      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(schema, new File(s"$dir.avro"))
+
+      fieldsAndRow.map(_._1).foreach { f =>
+        val avroRec = new GenericData.Record(schema)
+        f match {
+          case a : Array[Byte] =>
+            val fixedSchema = SchemaBuilder.fixed("fixed_name").size(4)
+            avroRec.put("field1", new Fixed(fixedSchema, a));
+          case other =>
+            avroRec.put("field1", other)
+        }
+        dataFileWriter.append(avroRec)
+      }
+      dataFileWriter.flush()
+      dataFileWriter.close()
+
+      val df = spark
+        .read.
+        format("avro")
+        .option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
+        .load(s"$dir.avro")
+      assert(df.schema === StructType.fromDDL("field1 " + expectedSchema))
+      assert(df.collect().toSet == fieldsAndRow.map(fr => Row(fr._2)).toSet)
+    }
+  }
+
   private def getResourceAvroFilePath(name: String): String = {
     Thread.currentThread().getContextClassLoader.getResource(name).toString
   }
@@ -271,29 +318,157 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-27858 Union type: More than one non-null type") {
-    withTempDir { dir =>
-      val complexNullUnionType = Schema.createUnion(
-        List(Schema.create(Type.INT), Schema.create(Type.NULL), 
Schema.create(Type.STRING)).asJava)
-      val fields = Seq(
-        new Field("field1", complexNullUnionType, "doc", 
null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("field1", 42)
-      dataFileWriter.append(avroRec)
-      val avroRec2 = new GenericData.Record(schema)
-      avroRec2.put("field1", "Alice")
-      dataFileWriter.append(avroRec2)
-      dataFileWriter.flush()
-      dataFileWriter.close()
+  // The test test Avro option "enableStableIdentifiersForUnionType". It adds 
all types into
+  // union and validate they are converted to expected SQL field names. The 
test also creates
+  // different cases that might cause field name conflicts and see they are 
handled properly.
+  test("SPARK-43333: Stable field names when converting Union type") {
+    checkUnionStableId(
+      List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+      "struct<member_int: int, member_string: string>",
+      Seq(
+        (42, Row(42, null)),
+        ("Alice", Row(null, "Alice"))))
 
-      val df = spark.read.format("avro").load(s"$dir.avro")
-      assert(df.schema === StructType.fromDDL("field1 struct<member0: int, 
member1: string>"))
-      assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, 
"Alice"))))
+    checkUnionStableId(
+      List( Type.FLOAT, Type.BOOLEAN, Type.BYTES, Type.DOUBLE, 
Type.LONG).map(Schema.create(_)),
+      "struct<member_float: float, member_boolean: boolean, " +
+        "member_bytes: binary, member_double: double, member_long: long>",
+      Seq(
+        (true, Row(null, true, null, null, null)),
+        (42L, Row(null, null, null, null, 42L)),
+        (42F, Row(42.0, null, null, null, null)),
+       (42D, Row(null, null, null, 42D, null))))
+
+    checkUnionStableId(
+      List(
+        Schema.createArray(Schema.create(Type.FLOAT)),
+        Schema.createMap(Schema.create(Schema.Type.INT))),
+      "struct<member_array: array<float>, member_map: map<string, int>>",
+      Seq())
+
+    checkUnionStableId(
+      List(
+        Schema.createEnum("myenum", "", null, List[String]("e1", "e2").asJava),
+        Schema.createRecord("myrecord", "", null, false,
+          List[Schema.Field](new Schema.Field("f", 
Schema.createFixed("myfield", "", null, 6)))
+            .asJava),
+        Schema.createRecord("myrecord2", "", null, false,
+          List[Schema.Field](new Schema.Field("f", Schema.create(Type.FLOAT)))
+            .asJava)),
+      "struct<member_myenum: string, member_myrecord: struct<f: binary>, " +
+                    "member_myrecord2: struct<f: float>>",
+      Seq())
+
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("MYFIELD2", "", null, 6),
+            Schema.createFixed("myfield1", "", null, 6),
+            Schema.createFixed("myfield2", "", null, 9)),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    {
+      val e = intercept[Exception] {
+        checkUnionStableId(
+          List(
+            Schema.createFixed("ARRAY", "", null, 6),
+            Schema.createArray(Schema.create(Type.STRING))),
+          "",
+          Seq())
+      }
+      assert(e.getMessage.contains("Cannot generate stable indentifier"))
+    }
+    // Two array types or two map types are not allowed in union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+           Schema.createArray(Schema.create(Type.FLOAT)),
+           Schema.createArray(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createMap(Schema.create(Type.FLOAT)),
+            Schema.createMap(Schema.create(Type.STRING))).asJava)
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+
+    // Somehow Avro allows named type "array", but doesn't allow an array type 
in the same union.
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(
+            Schema.createArray(Schema.create(Type.FLOAT)),
+            Schema.createFixed("array", "", null, 6)
+          ).asJava
+        )
+      }
+      assert(e.getMessage.contains("Duplicate in union"))
+    }
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("long", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after 
primitives"))
+    }
+
+    {
+      val e = intercept[Exception] {
+        Schema.createUnion(
+          List(Schema.createFixed("bytes", "", null, 6)).asJava
+        )
+      }
+      assert(e.getMessage.contains("Schemas may not be named after 
primitives"))
+    }
+  }
+
+  test("SPARK-27858 Union type: More than one non-null type") {
+    Seq(true, false).foreach { isStableUnionMember =>
+      withTempDir { dir =>
+        val complexNullUnionType = Schema.createUnion(
+          List(Schema.create(Type.INT), Schema.create(Type.NULL), 
Schema.create(Type.STRING))
+            .asJava
+        )
+        val fields =
+          Seq(new Field("field1", complexNullUnionType, "doc", 
null.asInstanceOf[AnyVal])).asJava
+        val schema = Schema.createRecord("name", "docs", "namespace", false)
+        schema.setFields(fields)
+        val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+        val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+        dataFileWriter.create(schema, new File(s"$dir.avro"))
+        val avroRec = new GenericData.Record(schema)
+        avroRec.put("field1", 42)
+        dataFileWriter.append(avroRec)
+        val avroRec2 = new GenericData.Record(schema)
+        avroRec2.put("field1", "Alice")
+        dataFileWriter.append(avroRec2)
+        dataFileWriter.flush()
+        dataFileWriter.close()
+
+        val df = spark
+          .read
+          .format("avro")
+          .option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, isStableUnionMember)
+          .load(s"$dir.avro")
+        if (isStableUnionMember) {
+          assert(df.schema === StructType.fromDDL(
+            "field1 struct<member_int: int, member_string: string>"))
+        } else {
+          assert(df.schema === StructType.fromDDL("field1 struct<member0: int, 
member1: string>"))
+        }
+        assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, 
"Alice"))))
+      }
     }
   }
 
@@ -2353,7 +2528,7 @@ abstract class AvroSuite
   }
 
   test("SPARK-40667: validate Avro Options") {
-    assert(AvroOptions.getAllOptions.size == 9)
+    assert(AvroOptions.getAllOptions.size == 10)
     // Please add validation on any new Avro options here
     assert(AvroOptions.isValidOption("ignoreExtension"))
     assert(AvroOptions.isValidOption("mode"))
@@ -2364,6 +2539,7 @@ abstract class AvroSuite
     assert(AvroOptions.isValidOption("recordNamespace"))
     assert(AvroOptions.isValidOption("positionalFieldMatching"))
     assert(AvroOptions.isValidOption("datetimeRebaseMode"))
+    assert(AvroOptions.isValidOption("enableStableIdentifiersForUnionType"))
   }
 }
 
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index 25c1fa30ed9..977886a6f34 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -321,7 +321,13 @@ Data source options of Avro can be set via:
     <td>read and write</td>
     <td>3.2.0</td>
   </tr>
-</table>
+  <tr>
+    <td><code>enableStableIdentifiersForUnionType</code></td>
+    <td>false</td>
+    <td>If it is set to true, Avro schema is deserialized into Spark SQL 
schema, and the Avro Union type is transformed into a structure where the field 
names remain consistent with their respective types. The resulting field names 
are converted to lowercase, e.g. member_int or member_string. If two 
user-defined type names or a user-defined type name and a built-in type name 
are identical regardless of case, an exception will be raised. However, in 
other cases, the field names can be uni [...]
+    <td>read</td>
+    <td>3.5.0</td>
+  </tr></table>
 
 ## Configuration
 Configuration of Avro can be done using the `setConf` method on SparkSession 
or by running `SET key=value` commands using SQL.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to