radu created FLINK-6081: --------------------------- Summary: Offset/Fetch support for SQL Streaming Key: FLINK-6081 URL: https://issues.apache.org/jira/browse/FLINK-6081 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: radu
Time target: Proc Time The main scope of Offset/Fetch is for pagination support. In the context of streaming Offset and Fetch would make sense within the scope of certain window constructs as they refer to buffered data from the stream (with a main usage to restrict the output that is shown at a certain moment). Therefore they should be applied to the output of the types of windows supported by the ORDER BY clauses. Moreover, in accordance to the SQL best practices, they can only be used with an ORDER BY clause. SQL targeted query examples: ---------------------------- Window defined based on group by clause ```Q1: SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) ``` Window defined based on where clause time boundaries ```Q2: SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current\_timestamp - INTERVAL '1' HOUR AND current\_timestamp FROM stream1 ``` ~~Window defined as sliding windows (aggregates) ~~ ``` Q3: ~~SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING b OFFSET n ROWS) FROM stream1~~ ``` Comment: Supporting offset over sliding windows (within the window) does not make sense because the main scope of OFFSET/FETCH is for pagination support. Therefore this functionality example should only be supported in relation to the output of a query. Hence, Q3 will not be supported The general grammar (Calcite version) for OFFSET/FECTH with available parameters is shown below: ``` Select […] [ ORDER BY orderItem [, orderItem ]* ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ] ``` Description ----------- Offset and Fetch are primary used for pagination support (i.e., restrict the output that is shown at some point). They were mainly designed to support web page display of the contents. Building on this scenario we can imagine a similar role for OFFSET and FETCH for streams that would display contents via a web page. In such a scenario the number of outputs to be displayed would be limited using such operators (probably for pagination and aesthetic reasons). However, as for any stream application there is a natural evolution in time, the operators output should evolve with the update rate of the application. The fact that there is an update rate and a collection of events related to a stream points to window constructs. Therefore the OFFSET/FETCH functionality would be related to the window mechanisms/boundaries defined by the query. Hence when the window construct would be re-triggered the output would be filtered again from the cardinality point of view based on the logic of the OFFSET/FETCH. Because of the primary reasons of supporting pagination (and controlling the number of outputs) we limit the usage of OFFSET/Fetch for window constructs that would be related to the output. Because of this supporting those on sliding window with query aggregates (e.g., Q3 query example) would not make sense. Additionally there is an implicit need for some ordering clause due to the fact that OFFSET and FETCH point to ordering positions. That is why these functions would be supported only if an ORDER BY clause is present. Functionality example --------------------- We exemplify the usage of OFFSET below using the following query. Event schema is in the form (a,b). ``` SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY CEIL(proctime TO HOUR) ``` ||Proctime|| IngestionTime(Event)|| Stream1|| Output|| | |10:00:01| (a1, 7)| | | |10:05:00| (c1, 2)| | | |10:12:00| (b1,5)| | | |10:50:00| (d1,2)| | |10-11| | |b1,a1| | |11:03:00| (a2,10)| | | |11-12| | |nil| |...| Implementation option --------------------- There are 2 options to implement the logic of OFFSET/Fetch: 1) Within the logic of the window (i.e. sorting window) Similar as for sorting support (ORDER BY clause), considering that the SQL operators will be associated with window boundaries, the functionality will be implemented within the logic of the window as follows. We extract the window boundaries and window type from the query logic. These will be used to define the type of the window, triggering policy. The logic of the query (i.e., the sorting of the events) will in turn be implemented within the window function. In addition to this, the logic of for filtering the output based on the cardinality logic of OFFSET/FETCH will be added. With this implementation the logic of the OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER BY is always required, it does not provide any implementation restrictions. 1) Within the logic of a filter/flatMap function with state counter for outputs) Instead of adding the logic within the window functions, the filtering can be done within a standalone operator that only counts outputs and emits the ones that fall within the logic of the OFFSET/FETCH. To provide this functionality we need to use a flatMap function in which we count the results. The OFFSET/FETCH condition would be transpose into the condition of an IF, applied based on the order of the output, to emit the output. However, the counter would need to be reset in accordance to the triggering of the window, which makes the implementation tedious. This is despite the fact that this implementation option would directly translate the output filtering logic of the operators from relational SQL. We recommend option 1 for implementation. Therefore for option 1 we reuse entirely the ORDER BY implementation and just add: 1) A counter for the indexing the outputs 2) An if condition to emit the output only if the corresponding index counter falls within the scope defined by the OFFSET/FETCH General logic of Join --------------------- inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window()) > //.trigger(new \[Time/Count\]Trigger()) – use default > > //.evictor(new \[Time/Count\]Evictor()) – use default .apply(SortAndCountFilter()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)