[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469873#comment-15469873
 ] 

Timo Walther commented on FLINK-4565:
-------------------------------------

Calcite translates the IN operator in 
{{org.apache.calcite.sql2rel.SqlToRelConverter#convertExpression}}. Calcite 
translates this into an Aggregate and Join. After fixing some issue in 
"DataSetAggregate" we can execute: {{"SELECT WordCount.word FROM WordCount 
WHERE WordCount.word IN (SELECT WordCount1.word AS w FROM WordCount1)"}}. The 
plan looks like:

{code}
== Physical Execution Plan ==
Stage 4 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 3 : Map
                content : from: (word, frequency)
                ship_strategy : Forward
                exchange_mode : BATCH
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 8 : Map
                        content : from: (word, frequency)
                        ship_strategy : Forward
                        exchange_mode : BATCH
                        driver_strategy : Map
                        Partitioning : RANDOM_PARTITIONED

                        Stage 7 : Map
                                content : prepare select: (word)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
                                driver_strategy : Map
                                Partitioning : RANDOM_PARTITIONED

                                Stage 6 : GroupCombine
                                        content : groupBy: (word), select:(word)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Sorted Combine
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 5 : GroupReduce
                                                content : groupBy: (word), 
select:(word)
                                                ship_strategy : Hash Partition 
on [0]
                                                exchange_mode : PIPELINED
                                                driver_strategy : Sorted Group 
Reduce
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 2 : Join
                                                        content : where: 
(=(word, w)), join: (word, frequency, w)
                                                        ship_strategy : Hash 
Partition on [0]
                                                        exchange_mode : 
PIPELINED
                                                        driver_strategy : 
Hybrid Hash (build: from: (word, frequency) (id: 3))
                                                        Partitioning : 
RANDOM_PARTITIONED

                                                        Stage 1 : FlatMap
                                                                content : 
select: (word)
                                                                ship_strategy : 
Forward
                                                                exchange_mode : 
PIPELINED
                                                                driver_strategy 
: FlatMap
                                                                Partitioning : 
RANDOM_PARTITIONED

                                                                Stage 0 : Data 
Sink
                                                                        content 
: org.apache.flink.api.java.io.DiscardingOutputFormat
                                                                        
ship_strategy : Forward
                                                                        
exchange_mode : PIPELINED
                                                                        
Partitioning : RANDOM_PARTITIONED
{code}

> Support for SQL IN operator
> ---------------------------
>
>                 Key: FLINK-4565
>                 URL: https://issues.apache.org/jira/browse/FLINK-4565
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to