Hi all
Currently, flink only supports limited window functions like TUMBLE, HOP,
SESSION. They are very useful in many use cases but sometimes
cannot meet business requirements which need a more complicated window logic.
Here are some requirements which cannot be satisfied by the current window
function and they are all keyed streams:
In ad system, we want to accumulate the show count of each ad and we need a
fixed time window like 1 hour to aggregate the result. But we cannot start the
ad window before some specified action of that ad happens. So each ad should
have a different window range.
A more complicated scenario is that for each ad, we start the window after some
specified event happens. But we need a global window to accumulative some
metrics of that ad. Each time a new ad event metric comes in, the global window
will be fired. Every hour, the global window will periodically be fired to
notify the current status to downstream consumers even if there's no new ad
events coming. The emitting result can be considered as (ad_id,
window_start_time, current_time, metric_number)??Downstream consumer may be a
strategy rule engine to take action according to the result. And more
complicated, to avoid the OOM issue of global window count accumulating
indefinitely, we can send a purge message of some special format to the stream
to let the window function purge the global window if we do not need that ad
anymore.
They are very classic use cases and cannot be supported purely in SQL right
now. It will be very helpful to enable customized window udf and it definitely
will enrich the functionality of flink SQL very much.
Public Interfaces
We need to extend current SQL semantics to support the definition of window udf.
create [TEMPORARY|TEMPORARY SYSTEM] window function [IF NOT EXISTS]
[catalog_name.][db_name.]function_name AS identifier (col_name col_type, ....)
TEMPORARY
Create temporary catalog function that has catalog and database namespaces and
overrides catalog functions.
TEMPORARY SYSTEM
Create temporary system function that has no namespace and overrides built-in
functions
IF NOT EXISTS
If the function already exists, nothing will happen.
IDENTIFIER
The full package class name of window function.
col_name col_type...
The result columns of the window emitted.
For example:
create temporary window function ad_sum_window as
'com.examples.AdSumWindowFunction';
select
ad_id,
ad_sum_window_window_start_time,
ad_sum_window_current_time,
ad_sum_window_sum_value as convert_cnt
from event_streams
group by
ad_id, ad_sum_window('start', convert_cnt, interval '1' hour)
ad_sum_window is a customized window function which is defined in
com.examples.AdSumWindowFunction??a detailed interface design of this new
architect of dunction will be introduced later. This window function will wait
event_name="start" from event_streams to begin the accumulative process of
convert_cnt metric. Also this window will periodically trigger every 1 hour to
report the current status. We only need to know is that we can infer the
produced columns of ad_sum_window in class definition. For example,
ad_sum_window will produce (window_start_time timestamp, current_time
timestamp, sum_value bigint), then flink will automatically produce
<udf_name>_<column_name> in select list. This is a little tricky but in
FLIP-145, some select columns like window_start and window_end are also
generated from window semantics. It would be great if you have any other good
ideas for this.
Proposed Changes
A new calcite syntax to create window function.
A new User-defined window function design just like SourceFunction or
ScalarFunction. Below logic can be defined in it:
Customized window type
Window assigner
Trigger
Produced result format
Thanks
Suhan