Hello Sandeep,

you can pass Row to UDAF. Just provide a proper inputSchema to your UDAF.

Check out this example https://docs.databricks.com/
spark/latest/spark-sql/udaf-scala.html

Yours,
Tomasz

2017-12-10 11:55 GMT+01:00 Sandip Mehta <sandip.mehta....@gmail.com>:

> Thanks Georg. I have looked at UADF based on your suggestion. Looks like
> you can only pass single column to UADF. Is there any way you can pass
> entire Row to aggregate function?
>
> I want to list of user defined function and given row object. Perform the
> aggregation and return aggregated Row object.
>
> Regards
> Sandeep
>
> On Fri, Dec 8, 2017 at 12:47 PM Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> You are looking for an UADF.
>> Sandip Mehta <sandip.mehta....@gmail.com> schrieb am Fr. 8. Dez. 2017 um
>> 06:20:
>>
>>> Hi,
>>>
>>> I want to group on certain columns and then for every group wants to
>>> apply custom UDF function to it. Currently groupBy only allows to add
>>> aggregation function to GroupData.
>>>
>>> For this was thinking to use groupByKey which will return
>>> KeyValueDataSet and then apply UDF for every group but really not been able
>>> solve this.
>>>
>>> SM
>>>
>>> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu <weichen...@databricks.com>
>>> wrote:
>>>
>>>> You can groupBy multiple columns on dataframe, so why you need so
>>>> complicated schema ?
>>>>
>>>> suppose df schema: (x, y, u, v, z)
>>>>
>>>> df.groupBy($"x", $"y").agg(...)
>>>>
>>>> Is this you want ?
>>>>
>>>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta <
>>>> sandip.mehta....@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> During my aggregation I end up having following schema.
>>>>>
>>>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>>>
>>>>> val values = Seq(
>>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>>     (Row(10, 11), Row(10, 2, 11)),
>>>>>     (Row(20, 11), Row(10, 2, 11))
>>>>>   )
>>>>>
>>>>>
>>>>> 1st tuple is used to group the relevant records for aggregation. I
>>>>> have used following to create dataset.
>>>>>
>>>>> val s = StructType(Seq(
>>>>>   StructField("x", IntegerType, true),
>>>>>   StructField("y", IntegerType, true)
>>>>> ))
>>>>> val s1 = StructType(Seq(
>>>>>   StructField("u", IntegerType, true),
>>>>>   StructField("v", IntegerType, true),
>>>>>   StructField("z", IntegerType, true)
>>>>> ))
>>>>>
>>>>> val ds = 
>>>>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>>>>>  RowEncoder(s1)))
>>>>>
>>>>> Is this correct way of representing this?
>>>>>
>>>>> How do I create dataset and row encoder for such use case for doing
>>>>> groupByKey on this?
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>> Sandeep
>>>>>
>>>>
>>>>

Reply via email to