Thanks for getting back Kurt. Yeah this might be an option to try out. I was 
hoping there would be a way to express this directly in the SQL though ☹.

-- Piyush


From: Kurt Young <ykt...@gmail.com>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <p.nar...@criteo.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

Could you try to add clientId into your aggregate function, and to track the 
map of <clientId, your_original_aggregation> inside your new aggregate 
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang 
<p.nar...@criteo.com<mailto:p.nar...@criteo.com>> wrote:
Hi folks,

I’m getting started with Flink and trying to figure out how to express 
aggregating some rows into an array to finally sink data into an 
AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by 
userId, clientId over a sliding window (10 mins, triggered every 1 min). My 
first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as 
custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)

This query works as I expect it to. In every time window I end up with inserts 
for unique userId + clientId combinations. What I want to do though, is 
generate a single row per userId in each time window and this is what I’m 
struggling with expressing along with the restriction that I want to sink this 
to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, 
dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, 
“AppendStreamTableSink requires that Table has only insert changes”. Does 
anyone know if there’s a way for me to compute my collect aggregation to 
produce one row per userId for a given time window?

Thanks,

-- Piyush

Reply via email to