[ 
https://issues.apache.org/jira/browse/FLINK-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radu updated FLINK-6077:
------------------------
    Attachment: in.png

> Support In/Exists/Except/Any /Some/All for Stream SQL
> -----------------------------------------------------
>
>                 Key: FLINK-6077
>                 URL: https://issues.apache.org/jira/browse/FLINK-6077
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: radu
>         Attachments: in.png
>
>
> Time target: Proc Time
> SQL targeted query examples:
> ----------------------------
> With inner query
> Q1. ```SELECT client FROM stream1 WHERE id 
> IN 
> ((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500 
> ))```
> Comment: A concrete example for this query can be to consider selecting the 
> customers where their country is the list of countries of suppliers (`Select 
> customer FROM customers WHERE Country IN (Select Country FROM suppliers)` )
> Comment: This implementation depends on the implementation of the inner
> query. The structure can be the same as for inner query support with the
> difference that the LogicalJoin between main query and inner query is
> conditional.
> Comment: The inner query needs a bound as otherwise it cannot be decided
> when to trigger.
> Comment: If the value is not triggered by the grouping expression then
> the inner query must based on when that expression changes value.
> Comments: boundaries should be supported over all options: group by
> clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to
> hour),)
> With collection
> Q2. ```SELECT * FROM stream1 WHERE b 
> IN 
> (5000, 7000, 8000, 9000)```
> Comment: This can be checked if it is supported by the DataStreamCalc
> implementation. If not it can be transformed as a sub-JIRA task to
> extend the DataStreamCalc functionality to implement this conditional
> behavior.
> Comment: A similar functionality can be provided if the collection is a
> table rather than a set of values.
> With table
> ```SELECT client FROM stream1 WHERE id 
> IN 
> ((SELECT id FROM table1 where stream1.id = table1.id))```
> Comment: This can be a sub-JIRA issue, perhaps  within the context of dynamic 
> tables, to support the join with tables and filtering operations based on 
> contents from an external table
> General comments: **Except** is similar in behavior with IN or EXISTS as
> it filters out outputs of the main stream based on data from a secondary
> stream. The implementation will follow exactly the same logic as for
> IN/Exists by filtering the outputs in the join function between the main
> stream and the secondary stream. Additionally, we apply the same
> restrictions for the secondary/inner queries.
> ```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID = 
> ORDERS.CUSTOMER\_ID 
> EXCEPT
> SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID = 
> ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);```
> Description:
> ------------
> The IN and EXISTS operators are conditional clauses in WHERE clause to
> check for values in certain collections. The collections based on which
> the restriction of the values is done can either be static (values,
> tables, or parts of a stream). This JIRA issue is concerned with the
> latter case of checking for values over a stream. In order for this
> operation to work, the stream needs to be bounded such that the result
> can trigger and the collection can be formed. This points out to using
> some boundaries or groupings over the sub-query that forms the
> collection over which IN is applied. This should be supported via 3
> options as shown below. Each of these options can be a sub-JIRA issue.
> 1)  Group By clauses that are applied over some monotonic order of the
>     stream based on which ranges are defined.
> `   [...] GROUP BY prodId`
> 3)  Window clauses to define rolling partitions of the data of the
>     stream, which evolve in time.
> `    [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3)   
> PRECEDING);`
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> ```SELECT * FROM stream1 WHERE id IN ((SELECT id2  FROM stream2 GROUP BY 
> FLOOR(PROCTIME TO HOUR) WHERE b>10   ))```
> Note: The inner query triggers only once an hour. For the next hour the 
> result of the previous hour from the inner query will be the one used to 
> filter the results from the main query as they come. This is consistent also 
> with how the inner queries are translated (see inner queries)
> ||IngestionTime(Event)||Stream1||Stream 2||Output||
> |10:00:01|    Id1,10|         |nil|
> |10:02:00|            |Id2,2| |       
> |11:25:00|            |Id3,15| |      
> |12:3:00|     Id2,15|         |nil|
> |12:05:00|    Id3,11|         |Id3,11|
> |12:06:00|            |Id2,30| |      
> |12:07:00|            |Id3,2|  |      
> |12:09:00|    Id2.17|         |nil|
> |12:10:00|    Id3,20|         |Id3,20|
> |...|
> Implementation option
> ---------------------
> Considering that the query only makes sense in the context of 1) window
> boundaries and 2) over sub-queries that extract collections of data, the
> main design of this is based on inner query implementation with the
> following modifications. (As a recap the Inner query is implemented with
> a special Join \[left type with always true condition\] between the main
> stream and the output of the inner query which is passed through a
> single value selection aggregation):
> 1)  The condition of outputting a result by the LogicalJoin is not
>     always true as before. Instead the condition is done within the
>     window function by checking that the input from main stream is
>     within the collection from the inner query.
> 2)    The check is done specifically based on the type of function used (IN, 
> ANY, SOMEā€¦.). The logic of each such function would need to have a direct 
> implementation.
> 3)  The filter on the inner query to keep a single value is removed and
>     instead a collection is passed for evaluation in the join.
> 4)  The boundaries of the SQL query are to be used as the boundaries to
>     define the join window in which the verification is done.
> 5)  Type of the join behavior is of the INNER JOIN from condition point
>     of view (value is emitted only if exists on the other side).
> [See attached document for schema]
> General logic of Join
> ---------------------
> leftDataStream.join(rightDataStream).where(new
> ConstantConditionSelector())
> .equalTo(new ConstantConditionSelector())
> .window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create())
> > //.trigger(new DefaultTrigger())
> >
> > //.evictor(new DefaultEvictor())
> .apply(FlatJoinFunctionWithInSelection());



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to