This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d49265a170fb [SPARK-46930][SQL] Add support for a custom prefix for 
Union type fields in Avro
d49265a170fb is described below

commit d49265a170fb7bb06471d97f4483139529939ecd
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Wed Jan 31 08:39:46 2024 -0800

    [SPARK-46930][SQL] Add support for a custom prefix for Union type fields in 
Avro
    
    ### What changes were proposed in this pull request?
    
    This PR enhances stable ids functionality in Avro by allowing users to 
configure a custom prefix for Union type member fields when 
`enableStableIdentifiersForUnionType` is enabled.
    
    Without the patch, the fields are generated with `member_` prefix, e.g. 
`member_int`, `member_string`. This could become difficult to change for 
complex schemas.
    
    The solution is to add a new option `stableIdentifierPrefixForUnionType` 
which defaults to `member_` and allows users to configure whatever prefix they 
require, e.g. `member`, `tmp_`, or even an empty string.
    
    ### Why are the changes needed?
    
    Allows to customise the prefix of stable ids in Avro without the need to 
rename all of the columns which could be cumbersome for complex schemas.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The PR adds a new option in Avro: `stableIdentifierPrefixForUnionType`.
    
    ### How was this patch tested?
    
    Existing tests + a new unit test to verify different prefixes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44964 from sadikovi/SPARK-46930.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/avro/AvroDataToCatalyst.scala | 12 +++--
 .../apache/spark/sql/avro/AvroDeserializer.scala   | 12 +++--
 .../org/apache/spark/sql/avro/AvroFileFormat.scala |  3 +-
 .../org/apache/spark/sql/avro/AvroOptions.scala    |  6 +++
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  5 +-
 .../apache/spark/sql/avro/SchemaConverters.scala   | 58 +++++++++++++++-------
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   |  3 +-
 .../sql/avro/AvroCatalystDataConversionSuite.scala |  7 +--
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala |  3 +-
 .../apache/spark/sql/avro/AvroRowReaderSuite.scala |  3 +-
 .../org/apache/spark/sql/avro/AvroSerdeSuite.scala |  3 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 54 +++++++++++++++-----
 docs/sql-data-sources-avro.md                      | 10 +++-
 13 files changed, 133 insertions(+), 46 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index 9f31a2db55a5..7d80998d96eb 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -40,7 +40,9 @@ private[sql] case class AvroDataToCatalyst(
 
   override lazy val dataType: DataType = {
     val dt = SchemaConverters.toSqlType(
-      expectedSchema, avroOptions.useStableIdForUnionType).dataType
+      expectedSchema,
+      avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType).dataType
     parseMode match {
       // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
       // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
@@ -62,8 +64,12 @@ private[sql] case class AvroDataToCatalyst(
   @transient private lazy val reader = new 
GenericDatumReader[Any](actualSchema, expectedSchema)
 
   @transient private lazy val deserializer =
-    new AvroDeserializer(expectedSchema, dataType,
-      avroOptions.datetimeRebaseModeInRead, 
avroOptions.useStableIdForUnionType)
+    new AvroDeserializer(
+      expectedSchema,
+      dataType,
+      avroOptions.datetimeRebaseModeInRead,
+      avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType)
 
   @transient private var decoder: BinaryDecoder = _
 
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9e10fac8bb55..139c45adb442 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -50,20 +50,23 @@ private[sql] class AvroDeserializer(
     positionalFieldMatch: Boolean,
     datetimeRebaseSpec: RebaseSpec,
     filters: StructFilters,
-    useStableIdForUnionType: Boolean) {
+    useStableIdForUnionType: Boolean,
+    stableIdPrefixForUnionType: String) {
 
   def this(
       rootAvroType: Schema,
       rootCatalystType: DataType,
       datetimeRebaseMode: String,
-      useStableIdForUnionType: Boolean) = {
+      useStableIdForUnionType: Boolean,
+      stableIdPrefixForUnionType: String) = {
     this(
       rootAvroType,
       rootCatalystType,
       positionalFieldMatch = false,
       RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
       new NoopFilters,
-      useStableIdForUnionType)
+      useStableIdForUnionType,
+      stableIdPrefixForUnionType)
   }
 
   private lazy val decimalConversions = new DecimalConversion()
@@ -124,7 +127,8 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = 
${catalystType.sql})"
 
-    val realDataType = SchemaConverters.toSqlType(avroType, 
useStableIdForUnionType).dataType
+    val realDataType = SchemaConverters.toSqlType(
+      avroType, useStableIdForUnionType, stableIdPrefixForUnionType).dataType
 
     (avroType.getType, catalystType) match {
       case (NULL, NullType) => (updater, ordinal, _) =>
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 7b0292df43c2..2792edaea284 100755
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -142,7 +142,8 @@ private[sql] class AvroFileFormat extends FileFormat
             parsedOptions.positionalFieldMatching,
             datetimeRebaseMode,
             avroFilters,
-            parsedOptions.useStableIdForUnionType)
+            parsedOptions.useStableIdForUnionType,
+            parsedOptions.stableIdPrefixForUnionType)
           override val stopPosition = file.start + file.length
 
           override def hasNext: Boolean = hasNextRow
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index a0db82f98716..4332904339f1 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -133,6 +133,9 @@ private[sql] class AvroOptions(
 
   val useStableIdForUnionType: Boolean =
     parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false)
+
+  val stableIdPrefixForUnionType: String = parameters
+    .getOrElse(STABLE_ID_PREFIX_FOR_UNION_TYPE, "member_")
 }
 
 private[sql] object AvroOptions extends DataSourceOptions {
@@ -164,4 +167,7 @@ private[sql] object AvroOptions extends DataSourceOptions {
   // type name are identical regardless of case, an exception will be raised. 
However, in other
   // cases, the field names can be uniquely identified.
   val STABLE_ID_FOR_UNION_TYPE = 
newOption("enableStableIdentifiersForUnionType")
+  // When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure 
the prefix for fields
+  // of Avro Union type.
+  val STABLE_ID_PREFIX_FOR_UNION_TYPE = 
newOption("stableIdentifierPrefixForUnionType")
 }
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 05562c913b19..c1365d1b5ae1 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -61,7 +61,10 @@ private[sql] object AvroUtils extends Logging {
           new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
       }
 
-    SchemaConverters.toSqlType(avroSchema, 
parsedOptions.useStableIdForUnionType).dataType match {
+    SchemaConverters.toSqlType(
+      avroSchema,
+      parsedOptions.useStableIdForUnionType,
+      parsedOptions.stableIdPrefixForUnionType).dataType match {
       case t: StructType => Some(t)
       case _ => throw new RuntimeException(
         s"""Avro schema cannot be converted to a Spark SQL StructType:
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 00fb32794e3a..387526d40f68 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -51,8 +51,11 @@ object SchemaConverters {
    *
    * @since 4.0.0
    */
-  def toSqlType(avroSchema: Schema, useStableIdForUnionType: Boolean): 
SchemaType = {
-    toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType)
+  def toSqlType(
+      avroSchema: Schema,
+      useStableIdForUnionType: Boolean,
+      stableIdPrefixForUnionType: String): SchemaType = {
+    toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType, 
stableIdPrefixForUnionType)
   }
   /**
    * Converts an Avro schema to a corresponding Spark SQL schema.
@@ -60,12 +63,17 @@ object SchemaConverters {
    * @since 2.4.0
    */
   def toSqlType(avroSchema: Schema): SchemaType = {
-    toSqlType(avroSchema, false)
+    toSqlType(avroSchema, false, "")
   }
 
   @deprecated("using toSqlType(..., useStableIdForUnionType: Boolean) 
instead", "4.0.0")
   def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType 
= {
-    toSqlTypeHelper(avroSchema, Set.empty, 
AvroOptions(options).useStableIdForUnionType)
+    val avroOptions = AvroOptions(options)
+    toSqlTypeHelper(
+      avroSchema,
+      Set.empty,
+      avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType)
   }
 
   // The property specifies Catalyst type of the given field
@@ -74,7 +82,8 @@ object SchemaConverters {
   private def toSqlTypeHelper(
       avroSchema: Schema,
       existingRecordNames: Set[String],
-      useStableIdForUnionType: Boolean): SchemaType = {
+      useStableIdForUnionType: Boolean,
+      stableIdPrefixForUnionType: String): SchemaType = {
     avroSchema.getType match {
       case INT => avroSchema.getLogicalType match {
         case _: Date => SchemaType(DateType, nullable = false)
@@ -127,7 +136,11 @@ object SchemaConverters {
         }
         val newRecordNames = existingRecordNames + avroSchema.getFullName
         val fields = avroSchema.getFields.asScala.map { f =>
-          val schemaType = toSqlTypeHelper(f.schema(), newRecordNames, 
useStableIdForUnionType)
+          val schemaType = toSqlTypeHelper(
+            f.schema(),
+            newRecordNames,
+            useStableIdForUnionType,
+            stableIdPrefixForUnionType)
           StructField(f.name, schemaType.dataType, schemaType.nullable)
         }
 
@@ -137,14 +150,15 @@ object SchemaConverters {
         val schemaType = toSqlTypeHelper(
           avroSchema.getElementType,
           existingRecordNames,
-          useStableIdForUnionType)
+          useStableIdForUnionType,
+          stableIdPrefixForUnionType)
         SchemaType(
           ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
           nullable = false)
 
       case MAP =>
         val schemaType = toSqlTypeHelper(avroSchema.getValueType,
-          existingRecordNames, useStableIdForUnionType)
+          existingRecordNames, useStableIdForUnionType, 
stableIdPrefixForUnionType)
         SchemaType(
           MapType(StringType, schemaType.dataType, valueContainsNull = 
schemaType.nullable),
           nullable = false)
@@ -154,18 +168,22 @@ object SchemaConverters {
           // In case of a union with null, eliminate it and make a recursive 
call
           val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
           if (remainingUnionTypes.size == 1) {
-            toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames, 
useStableIdForUnionType)
-              .copy(nullable = true)
+            toSqlTypeHelper(
+              remainingUnionTypes.head,
+              existingRecordNames,
+              useStableIdForUnionType,
+              stableIdPrefixForUnionType).copy(nullable = true)
           } else {
             toSqlTypeHelper(
               Schema.createUnion(remainingUnionTypes.asJava),
               existingRecordNames,
-              useStableIdForUnionType).copy(nullable = true)
+              useStableIdForUnionType,
+              stableIdPrefixForUnionType).copy(nullable = true)
           }
         } else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
           case Seq(t1) =>
             toSqlTypeHelper(avroSchema.getTypes.get(0),
-              existingRecordNames, useStableIdForUnionType)
+              existingRecordNames, useStableIdForUnionType, 
stableIdPrefixForUnionType)
           case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
             SchemaType(LongType, nullable = false)
           case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
@@ -179,20 +197,26 @@ object SchemaConverters {
             val fieldNameSet : mutable.Set[String] = mutable.Set()
             val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
               case (s, i) =>
-                val schemaType = toSqlTypeHelper(s, existingRecordNames, 
useStableIdForUnionType)
+                val schemaType = toSqlTypeHelper(
+                  s,
+                  existingRecordNames,
+                  useStableIdForUnionType,
+                  stableIdPrefixForUnionType)
 
                 val fieldName = if (useStableIdForUnionType) {
                   // Avro's field name may be case sensitive, so field names 
for two named type
                   // could be "a" and "A" and we need to distinguish them. In 
this case, we throw
                   // an exception.
-                  val temp_name = 
s"member_${s.getName.toLowerCase(Locale.ROOT)}"
-                  if (fieldNameSet.contains(temp_name)) {
+                  // Stable id prefix can be empty so the name of the field 
can be just the type.
+                  val tempFieldName =
+                    
s"${stableIdPrefixForUnionType}${s.getName.toLowerCase(Locale.ROOT)}"
+                  if (fieldNameSet.contains(tempFieldName)) {
                     throw new IncompatibleSchemaException(
                       "Cannot generate stable indentifier for Avro union type 
due to name " +
                       s"conflict of type name ${s.getName}")
                   }
-                  fieldNameSet.add(temp_name)
-                  temp_name
+                  fieldNameSet.add(tempFieldName)
+                  tempFieldName
                 } else {
                   s"member$i"
                 }
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 2c85c1b06739..1083c9916072 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -104,7 +104,8 @@ case class AvroPartitionReaderFactory(
           options.positionalFieldMatching,
           datetimeRebaseMode,
           avroFilters,
-          options.useStableIdForUnionType)
+          options.useStableIdForUnionType,
+          options.stableIdPrefixForUnionType)
         override val stopPosition = partitionedFile.start + 
partitionedFile.length
 
         override def next(): Boolean = hasNextRow
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index 633bbce8df80..388347537a4d 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -60,7 +60,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
 
     val expected = {
       val avroSchema = new Schema.Parser().parse(schema)
-      SchemaConverters.toSqlType(avroSchema, false).dataType match {
+      SchemaConverters.toSqlType(avroSchema, false, "").dataType match {
         case st: StructType => Row.fromSeq((0 until st.length).map(_ => null))
         case _ => null
       }
@@ -283,14 +283,15 @@ class AvroCatalystDataConversionSuite extends 
SparkFunSuite
       data: GenericData.Record,
       expected: Option[Any],
       filters: StructFilters = new NoopFilters): Unit = {
-    val dataType = SchemaConverters.toSqlType(schema, false).dataType
+    val dataType = SchemaConverters.toSqlType(schema, false, "").dataType
     val deserializer = new AvroDeserializer(
       schema,
       dataType,
       false,
       RebaseSpec(LegacyBehaviorPolicy.CORRECTED),
       filters,
-      false)
+      false,
+      "")
     val deserialized = deserializer.deserialize(data)
     expected match {
       case None => assert(deserialized == None)
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 9095f1c0831a..d16ddb497320 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -264,7 +264,8 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val avroOptions = AvroOptions(options)
     val avroSchema = avroOptions.schema.get
     val sparkSchema = SchemaConverters
-      .toSqlType(avroSchema, avroOptions.useStableIdForUnionType)
+      .toSqlType(avroSchema, avroOptions.useStableIdForUnionType,
+        avroOptions.stableIdPrefixForUnionType)
       .dataType
       .asInstanceOf[StructType]
 
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
index 7117ef4b21e8..9b3bb929a700 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
@@ -76,7 +76,8 @@ class AvroRowReaderSuite
           false,
           RebaseSpec(CORRECTED),
           new NoopFilters,
-          false)
+          false,
+          "")
         override val stopPosition = fileSize
 
         override def hasNext: Boolean = hasNextRow
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
index c6d0398017ef..cbcbc2e7e76a 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
@@ -227,7 +227,8 @@ object AvroSerdeSuite {
         isPositional(matchType),
         RebaseSpec(CORRECTED),
         new NoopFilters,
-        false)
+        false,
+        "")
   }
 
   /**
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 3d481d1d731d..61d93ef82336 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -103,14 +103,16 @@ abstract class AvroSuite
   // Check whether an Avro schema of union type is converted to SQL in an 
expected way, when the
   // stable ID option is on.
   //
-  // @param types           Avro types that contain in an Avro union type
-  // @param expectedSchema  expeted SQL schema, provided in DDL string form
-  // @param fieldsAndRow    A list of rows to be appended to the Avro file and 
the expected
-  // converted SQL rows
+  // @param types             Avro types that contain in an Avro union type
+  // @param expectedSchema    Expected SQL schema, provided in DDL string form
+  // @param fieldsAndRow      A list of rows to be appended to the Avro file 
and the expected
+  //                          converted SQL rows
+  // @param stableIdPrefixOpt Stable id prefix to use for Union type
   private def checkUnionStableId(
       types: List[Schema],
       expectedSchema: String,
-      fieldsAndRow: Seq[(Any, Row)]): Unit = {
+      fieldsAndRow: Seq[(Any, Row)],
+      stableIdPrefixOpt: Option[String] = None): Unit = {
     withTempDir { dir =>
       val unionType = Schema.createUnion(
         types.asJava
@@ -137,11 +139,16 @@ abstract class AvroSuite
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark
-        .read.
-        format("avro")
+      var dfReader = spark
+        .read
+        .format("avro")
         .option(AvroOptions.STABLE_ID_FOR_UNION_TYPE, "true")
-        .load(s"$dir.avro")
+
+      stableIdPrefixOpt.foreach { prefix =>
+        dfReader = 
dfReader.option(AvroOptions.STABLE_ID_PREFIX_FOR_UNION_TYPE, prefix)
+      }
+
+      val df = dfReader.load(s"$dir.avro")
       assert(df.schema === StructType.fromDDL("field1 " + expectedSchema))
       assert(df.collect().toSet == fieldsAndRow.map(fr => Row(fr._2)).toSet)
     }
@@ -320,7 +327,7 @@ abstract class AvroSuite
     }
   }
 
-  // The test test Avro option "enableStableIdentifiersForUnionType". It adds 
all types into
+  // The test verifies Avro option "enableStableIdentifiersForUnionType". It 
adds all types into
   // union and validate they are converted to expected SQL field names. The 
test also creates
   // different cases that might cause field name conflicts and see they are 
handled properly.
   test("SPARK-43333: Stable field names when converting Union type") {
@@ -435,6 +442,28 @@ abstract class AvroSuite
     }
   }
 
+  test("SPARK-46930: Use custom prefix for stable ids when converting Union 
type") {
+    // Test default "member_" prefix.
+    checkUnionStableId(
+      List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+      "struct<member_int: int, member_string: string>",
+      Seq(
+        (42, Row(42, null)),
+        ("Alice", Row(null, "Alice"))))
+
+    // Test user-configured prefixes.
+    for (prefix <- Seq("tmp_", "tmp", "member", "MEMBER_", "__", "")) {
+      checkUnionStableId(
+        List(Type.INT, Type.NULL, Type.STRING).map(Schema.create(_)),
+        s"struct<${prefix}int: int, ${prefix}string: string>",
+        Seq(
+          (42, Row(42, null)),
+          ("Alice", Row(null, "Alice"))),
+        Some(prefix)
+      )
+    }
+  }
+
   test("SPARK-27858 Union type: More than one non-null type") {
     Seq(true, false).foreach { isStableUnionMember =>
       withTempDir { dir =>
@@ -2146,7 +2175,7 @@ abstract class AvroSuite
 
   private def checkSchemaWithRecursiveLoop(avroSchema: String): Unit = {
     val message = intercept[IncompatibleSchemaException] {
-      SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false)
+      SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema), false, 
"")
     }.getMessage
 
     assert(message.contains("Found recursive reference in Avro schema"))
@@ -2703,7 +2732,7 @@ abstract class AvroSuite
   }
 
   test("SPARK-40667: validate Avro Options") {
-    assert(AvroOptions.getAllOptions.size == 10)
+    assert(AvroOptions.getAllOptions.size == 11)
     // Please add validation on any new Avro options here
     assert(AvroOptions.isValidOption("ignoreExtension"))
     assert(AvroOptions.isValidOption("mode"))
@@ -2715,6 +2744,7 @@ abstract class AvroSuite
     assert(AvroOptions.isValidOption("positionalFieldMatching"))
     assert(AvroOptions.isValidOption("datetimeRebaseMode"))
     assert(AvroOptions.isValidOption("enableStableIdentifiersForUnionType"))
+    assert(AvroOptions.isValidOption("stableIdentifierPrefixForUnionType"))
   }
 
   test("SPARK-46633: read file with empty blocks") {
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index cbc3367e5f85..712d4d3b8cd4 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -327,7 +327,15 @@ Data source options of Avro can be set via:
     <td>If it is set to true, Avro schema is deserialized into Spark SQL 
schema, and the Avro Union type is transformed into a structure where the field 
names remain consistent with their respective types. The resulting field names 
are converted to lowercase, e.g. member_int or member_string. If two 
user-defined type names or a user-defined type name and a built-in type name 
are identical regardless of case, an exception will be raised. However, in 
other cases, the field names can be uni [...]
     <td>read</td>
     <td>3.5.0</td>
-  </tr></table>
+  </tr>
+  <tr>
+    <td><code>stableIdentifierPrefixForUnionType</code></td>
+    <td>member_</td>
+    <td>When `enableStableIdentifiersForUnionType` is enabled, the option 
allows to configure the prefix for fields of Avro Union type.</td>
+    <td>read</td>
+    <td>4.0.0</td>
+  </tr>
+</table>
 
 ## Configuration
 Configuration of Avro can be done via `spark.conf.set` or by running `SET 
key=value` commands using SQL.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to