cloud-fan commented on code in PR #45934:
URL: https://github.com/apache/spark/pull/45934#discussion_r1565141180


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##########
@@ -489,3 +492,67 @@ object SchemaOfVariant {
   def mergeSchema(t1: DataType, t2: DataType): DataType =
     JsonInferSchema.compatibleType(t1, t2, VariantType)
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(v) - Returns the merged schema in the SQL format of a 
variant column.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(parse_json(j)) FROM VALUES ('1'), ('2'), ('3') AS tab(j);
+       BIGINT
+      > SELECT _FUNC_(parse_json(j)) FROM VALUES ('{"a": 1}'), ('{"b": 
true}'), ('{"c": 1.23}') AS tab(j);
+       STRUCT<a: BIGINT, b: BOOLEAN, c: DECIMAL(3,2)>
+  """,
+  since = "4.0.0",
+  group = "variant_funcs")
+// scalastyle:on line.size.limit
+case class SchemaOfVariantAgg(
+    child: Expression,
+    override val mutableAggBufferOffset: Int,
+    override val inputAggBufferOffset: Int)
+    extends TypedImperativeAggregate[DataType]
+    with ExpectsInputTypes
+    with QueryErrorsBase
+    with UnaryLike[Expression] {
+  def this(child: Expression) = this(child, 0, 0)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType)
+
+  override def dataType: DataType = StringType
+
+  override def nullable: Boolean = false
+
+  override def createAggregationBuffer(): DataType = NullType
+
+  override def update(buffer: DataType, input: InternalRow): DataType = {
+    val inputVariant = child.eval(input).asInstanceOf[VariantVal]
+    if (inputVariant != null) {
+      val v = new Variant(inputVariant.getValue, inputVariant.getMetadata)
+      SchemaOfVariant.mergeSchema(buffer, SchemaOfVariant.schemaOf(v))
+    } else {
+      buffer
+    }
+  }
+
+  override def merge(buffer: DataType, input: DataType): DataType =
+    SchemaOfVariant.mergeSchema(buffer, input)
+
+  override def eval(buffer: DataType): Any = UTF8String.fromString(buffer.sql)
+
+  override def serialize(buffer: DataType): Array[Byte] = 
buffer.sql.getBytes("UTF-8")

Review Comment:
   can we make sure the `.sql` method won't truncate the string? BTW, why not 
use `.json`? JSON is more like a serde format and we store table schema JSON 
string in the hive metastore as well, instead of the SQL string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to