Hi,

given is a simple DF:

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- val: string (nullable = true)

I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
„valsStruct“).
The aggregates simply stores all val in Set.

The result is:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- valsStruct: struct (nullable = true)
 |    |-- vals: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

But i would expect:

root
 |-- id1: string (nullable = true)
 |-- id2: integer (nullable = true)
 |-- vals: array (nullable = true)
 |    |— element: string (containsNull = true)

What I’m doing right now is to add a new columns val with valsStruct.vals
as a value and drop valsStruct afterwards, but i’m quite sure there is a
more elegant way. I tried various implementations of the evaluate method,
but none of those worked for me. Can you tell me what I am missing here?

The implementation of the UDAF:

class AggregateVals extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(
    StructField("val", StringType, true)
  ))

  def bufferSchema: StructType = StructType(Array(
    StructField("vals", ArrayType(StringType, true))
  ))

  def dataType: DataType = bufferSchema

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Seq[String]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val existing: Seq[String] = buffer.getSeq[String](0)
    val newBuffer = existing :+ input.getAs[String](0)
    buffer(0) = newBuffer
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
  }

  def evaluate(buffer: Row): Any = {
    buffer
  }
}

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Reply via email to