[ https://issues.apache.org/jira/browse/FLINK-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
radu updated FLINK-6077: ------------------------ Attachment: in.png > Support In/Exists/Except/Any /Some/All for Stream SQL > ----------------------------------------------------- > > Key: FLINK-6077 > URL: https://issues.apache.org/jira/browse/FLINK-6077 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: radu > Attachments: in.png > > > Time target: Proc Time > SQL targeted query examples: > ---------------------------- > With inner query > Q1. ```SELECT client FROM stream1 WHERE id > IN > ((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500 > ))``` > Comment: A concrete example for this query can be to consider selecting the > customers where their country is the list of countries of suppliers (`Select > customer FROM customers WHERE Country IN (Select Country FROM suppliers)` ) > Comment: This implementation depends on the implementation of the inner > query. The structure can be the same as for inner query support with the > difference that the LogicalJoin between main query and inner query is > conditional. > Comment: The inner query needs a bound as otherwise it cannot be decided > when to trigger. > Comment: If the value is not triggered by the grouping expression then > the inner query must based on when that expression changes value. > Comments: boundaries should be supported over all options: group by > clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to > hour),) > With collection > Q2. ```SELECT * FROM stream1 WHERE b > IN > (5000, 7000, 8000, 9000)``` > Comment: This can be checked if it is supported by the DataStreamCalc > implementation. If not it can be transformed as a sub-JIRA task to > extend the DataStreamCalc functionality to implement this conditional > behavior. > Comment: A similar functionality can be provided if the collection is a > table rather than a set of values. > With table > ```SELECT client FROM stream1 WHERE id > IN > ((SELECT id FROM table1 where stream1.id = table1.id))``` > Comment: This can be a sub-JIRA issue, perhaps within the context of dynamic > tables, to support the join with tables and filtering operations based on > contents from an external table > General comments: **Except** is similar in behavior with IN or EXISTS as > it filters out outputs of the main stream based on data from a secondary > stream. The implementation will follow exactly the same logic as for > IN/Exists by filtering the outputs in the join function between the main > stream and the secondary stream. Additionally, we apply the same > restrictions for the secondary/inner queries. > ```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID = > ORDERS.CUSTOMER\_ID > EXCEPT > SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID = > ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);``` > Description: > ------------ > The IN and EXISTS operators are conditional clauses in WHERE clause to > check for values in certain collections. The collections based on which > the restriction of the values is done can either be static (values, > tables, or parts of a stream). This JIRA issue is concerned with the > latter case of checking for values over a stream. In order for this > operation to work, the stream needs to be bounded such that the result > can trigger and the collection can be formed. This points out to using > some boundaries or groupings over the sub-query that forms the > collection over which IN is applied. This should be supported via 3 > options as shown below. Each of these options can be a sub-JIRA issue. > 1) Group By clauses that are applied over some monotonic order of the > stream based on which ranges are defined. > ` [...] GROUP BY prodId` > 3) Window clauses to define rolling partitions of the data of the > stream, which evolve in time. > ` [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3) > PRECEDING);` > Functionality example > --------------------- > We exemplify below the functionality of the IN/Exists when working with > streams. > ```SELECT * FROM stream1 WHERE id IN ((SELECT id2 FROM stream2 GROUP BY > FLOOR(PROCTIME TO HOUR) WHERE b>10 ))``` > Note: The inner query triggers only once an hour. For the next hour the > result of the previous hour from the inner query will be the one used to > filter the results from the main query as they come. This is consistent also > with how the inner queries are translated (see inner queries) > ||IngestionTime(Event)||Stream1||Stream 2||Output|| > |10:00:01| Id1,10| |nil| > |10:02:00| |Id2,2| | > |11:25:00| |Id3,15| | > |12:3:00| Id2,15| |nil| > |12:05:00| Id3,11| |Id3,11| > |12:06:00| |Id2,30| | > |12:07:00| |Id3,2| | > |12:09:00| Id2.17| |nil| > |12:10:00| Id3,20| |Id3,20| > |...| > Implementation option > --------------------- > Considering that the query only makes sense in the context of 1) window > boundaries and 2) over sub-queries that extract collections of data, the > main design of this is based on inner query implementation with the > following modifications. (As a recap the Inner query is implemented with > a special Join \[left type with always true condition\] between the main > stream and the output of the inner query which is passed through a > single value selection aggregation): > 1) The condition of outputting a result by the LogicalJoin is not > always true as before. Instead the condition is done within the > window function by checking that the input from main stream is > within the collection from the inner query. > 2) The check is done specifically based on the type of function used (IN, > ANY, SOMEā¦.). The logic of each such function would need to have a direct > implementation. > 3) The filter on the inner query to keep a single value is removed and > instead a collection is passed for evaluation in the join. > 4) The boundaries of the SQL query are to be used as the boundaries to > define the join window in which the verification is done. > 5) Type of the join behavior is of the INNER JOIN from condition point > of view (value is emitted only if exists on the other side). > [See attached document for schema] > General logic of Join > --------------------- > leftDataStream.join(rightDataStream).where(new > ConstantConditionSelector()) > .equalTo(new ConstantConditionSelector()) > .window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create()) > > //.trigger(new DefaultTrigger()) > > > > //.evictor(new DefaultEvictor()) > .apply(FlatJoinFunctionWithInSelection()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)