Re: Converting RelationalGroupedDataSet to DataFrame

2021-02-07 Thread Stephane Verlet

Once you have a RelationalGroupedDataSet , you can use agg() to perform
group wide operation such max , sum , etc ... or even custom aggregator.
df.groupBy().agg(sum(col(...)))
That will return a DF with your groupBy columns and result of the
aggregation
Stephane
Soheil Pourbafrani wrote:
Hi,
In my problem, I need to group the DataFrame, apply the business logic for
each group and finally emit a new DataFrame based on that. To describe in
detail, there is a device_dataframe which contains the timestamp of when
the device had been turned on (on) and turned off (off).+-+-
++ |device_id|state | d_ts | +-+-
++ |1 |off |2020-09-10 16:14:58 | |1 |on |2020-09-19
16:14:58 | |2 |on |2020-09-20 16:14:58 | |2 |off |2020-10-03 16:14:58 | |4
|on |2020-09-20 16:14:58 | |5 |off |2020-09-20 16:14:58 |
+-+-+---+-+ On the other hand, there is a
DataFrame containing events information including its timestamp and its
corresponding device.+-+-++ |e_id
|device_id| e_ts | +-+-++ |1 |1 |2020-09-20
16:14:58 | |2 |2 |2020-10-08 09:19:55 | |3 |4 |2020-11-01 12:15:37 | |4 |5
|2020-10-08 01:35:08 | +-+-+---++ The following
is a join example of two
DataFrames:+-+-++--++
|device_id|e_id | e_ts |state | d_ts |
+-+-++--++ |1 |1
|2020-09-20 16:14:58 |off |2020-09-10 16:14:58 | |1 |1 |2020-09-20 16:14:58
|on |2020-09-19 16:14:58 | |2 |2 |2020-10-08 09:19:55 |on |2020-09-20
16:14:58 | |2 |2 |2020-10-08 09:19:55 |off |2020-10-03 16:14:58 | |4 |3
|2020-11-01 12:15:37 |on |2020-09-20 16:14:58 | |5 |4 |2020-10-08 01:35:08
|off |2020-09-20 16:14:58 |
+-+-+---++--++ What I
finally need to find is the events information that happened while its
corresponding device was on. For example in the case of the above table,
the event_id 1 is valid because it happened on 2020-09-20 16:14:58 and its
device has been on since 2020-09-19 16:14:58, and the event_id 2 is not
valid as its device was turned down on 2020-10-03 16:14:58 and never turned
on again, and so on which results in the following
table:+-+-+---+ |device_id|e_id | e_ts |
+-+-+---+ |1 |1 |2020-09-20 16:14:58| |4 |3
|2020-11-01 12:15:37| +-+-+---+ I did the below
to group the join table base on the devices:val grouped = eventDF
.join(deviceDF, "device_id") .groupBy("device_id") which results in
RelationalGroupedDataSet. Now I need to apply the logic to each group and
emit the result DataFrame but I didn't find a solution for that. I checked
the UDAFs but I found it not working in my case.I know how to solve this
using RDD API, but I want to find its Column API approach. Any help or
suggestion will be appreciated.
Thanks


Converting RelationalGroupedDataSet to DataFrame

2021-02-06 Thread Soheil Pourbafrani
Hi,

In my problem, I need to group the DataFrame, apply the business logic for
each group and finally emit a new DataFrame based on that. To describe in
detail, there is a device_dataframe which contains the timestamp of when
the device had been turned on (on) and turned off (off).

+-+- ++
|device_id|state |   d_ts |
+-+- ++
|1|off   |2020-09-10 16:14:58 |
|1|on|2020-09-19 16:14:58 |
|2|on|2020-09-20 16:14:58 |
|2|off   |2020-10-03 16:14:58 |
|4|on|2020-09-20 16:14:58 |
|5|off   |2020-09-20 16:14:58 |
+-+-+---+-+

On the other hand, there is a DataFrame containing events information
including its timestamp and its corresponding device.

+-+-++
|e_id |device_id|   e_ts |
+-+-++
|1|1|2020-09-20 16:14:58 |
|2|2|2020-10-08 09:19:55 |
|3|4|2020-11-01 12:15:37 |
|4|5|2020-10-08 01:35:08 |
+-+-+---++

The following is a join example of two DataFrames:

+-+-++--++
|device_id|e_id |   e_ts |state |   d_ts |
+-+-++--++
|1|1|2020-09-20 16:14:58 |off   |2020-09-10 16:14:58 |
|1|1|2020-09-20 16:14:58 |on|2020-09-19 16:14:58 |
|2|2|2020-10-08 09:19:55 |on|2020-09-20 16:14:58 |
|2|2|2020-10-08 09:19:55 |off   |2020-10-03 16:14:58 |
|4|3|2020-11-01 12:15:37 |on|2020-09-20 16:14:58 |
|5|4|2020-10-08 01:35:08 |off   |2020-09-20 16:14:58 |
+-+-+---++--++

What I finally need to find is the events information that happened while
its corresponding device was on. For example in the case of the above
table, the event_id 1 is valid because it happened on 2020-09-20 16:14:58
and its device has been on since 2020-09-19 16:14:58, and the event_id 2 is
not valid as its device was turned down on 2020-10-03 16:14:58 and never
turned on again, and so on which results in the following table:

+-+-+---+
|device_id|e_id |   e_ts|
+-+-+---+
|1|1|2020-09-20 16:14:58|
|4|3|2020-11-01 12:15:37|
+-+-+---+

I did the below to group the join table base on the devices:

val grouped = eventDF
  .join(deviceDF, "device_id")
  .groupBy("device_id")

which results in RelationalGroupedDataSet. Now I need to apply the logic to
each group and emit the result DataFrame but I didn't find a solution for
that. I checked the UDAFs but I found it not working in my case.I know how
to solve this using RDD API, but I want to find its *Column API* approach.
Any help or suggestion will be appreciated.
Thanks