[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945125#comment-15945125
 ] 

radu commented on FLINK-6073:
-----------------------------

Hi [~fhueske] thanks again for the feedback - it is really helpful and i 
understand what do you mean for the retraction. I think with respect to this we 
can create more aggregators such as for single value, for list of outputs, join 
map....on which we can operate the retraction in a similar way with the model 
we used. But this i think it make sense after the  FLINK-6047 is done.

Anyway - meanwhile i like the idea you say of reshaping the query and than work 
on it (whether now or after the  FLINK-5884 is done).

I have parsed the query model you propose
SELECT A1, B1 
    FROM T1, T2
   WHERE T1.A3 = (SELECT Max(T1.A3) FROM T1
   WHERE T1.A3 <= T2.B3)

The resulting logical plan is:

LogicalProject(A1=[$5], B1=[$2])
  LogicalJoin(condition=[=($7, $1)], joinType=[inner])
    LogicalJoin(condition=[=($4, $0)], joinType=[inner])
      LogicalAggregate(group=[{1}], EXPR$0=[MAX($0)])
        LogicalJoin(condition=[<=($0, $1)], joinType=[inner])
          LogicalProject(A3=[$2])
            LogicalTableScan(table=[[T1]])
          LogicalAggregate(group=[{1}])
            LogicalJoin(condition=[true], joinType=[inner])
              LogicalProject(A1=[$0])
                LogicalTableScan(table=[[T1]])
              LogicalProject(B3=[$2])
                LogicalTableScan(table=[[T2]])
      LogicalTableScan(table=[[T2]])
    LogicalTableScan(table=[[T1]])


The problem is not that is is verbose (or that the query is more verbose), but:
1) the fact that it has many operators that are used for implementing the 
corresponding logic. One option is of course to implement each logical operator 
into a corresponding physical operator. On the one hand it will give the 
correct logic but on the other hand it will be very costly from compute 
resources point of view. The alternative would be of course to try to identify 
the pattern of this query parsing and map it to the simple implementation i 
proposed in the design document.
2) we can consider (perhaps not for the currency exchange scenario but for 
other) that we define a window boundary for the inner query
3) if you look into how the query is parsed you have in it's core the following 
pattern:
   LogicalJoin(condition=[true], joinType=[inner])
              LogicalProject(A1=[$0])
                LogicalTableScan(table=[[T1]])
              LogicalProject(B3=[$2])
                LogicalTableScan(table=[[T2]])
...this is again an unconditional join without boundaries which suppers in the 
final end from the same problems of not complying with the batch equivalency as 
the nitial query model.

Considering all these i would propose 2 potential middle options:
1) Have the inner query working on time boundaries  ..for example for a query 
where we would extract the max timestamp
SELECT T1.A1
   (SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) 
FROM T2)
 FROM T1 

would be translated to 
 LogicalJoin(condition=[true], joinType=[left])
  LogicalProject(A1=[$0])
    LogicalTableScan(table=[[T1]])
  LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($2)])
    LogicalWindow(window#0=[window(partition {} order by [1] range between $2 
PRECEDING and CURRENT ROW aggs [MAX($0)])])
      LogicalProject(B1=[$0], B4=[$3])
        LogicalTableScan(table=[[T2]])
=> this is simpler and the join could be driven by the time boundaries within 
the window

2) Have the inner query with some time condition from the example you gave
"SELECT T1.A1,
   (SELECT T2.B1 FROM T2 WHERE T1.A4 <= T2.B4 )
   FROM T1

which starts to become a quite compelx query like:
 LogicalProject(A1=[$0], EXPR$1=[$5])
  LogicalJoin(condition=[=($3, $4)], joinType=[left])
    LogicalTableScan(table=[[T1]])
    LogicalAggregate(group=[{2}], agg#0=[SINGLE_VALUE($0)])
      LogicalJoin(condition=[<=($2, $1)], joinType=[inner])
        LogicalProject(B1=[$0], B4=[$3])
          LogicalTableScan(table=[[T2]])
        LogicalAggregate(group=[{3}])
          LogicalTableScan(table=[[T1]])

=>case in which we can directly consider the proper query you proposed

I would propose to focus on 1 as it make sense also from the perspective of the 
other queries...what do you think?
 


> Support for SQL inner queries for proctime
> ------------------------------------------
>
>                 Key: FLINK-6073
>                 URL: https://issues.apache.org/jira/browse/FLINK-6073
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: radu
>            Assignee: radu
>            Priority: Critical
>              Labels: features
>         Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>       LogicalSingleValue[type=aggregation]
>               …logic of inner query (LogicalProject, LogicalScan…)
>       …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.    Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.    We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 streams that operate on processing time.
> SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2
>  ||Time||Stream1||Stream2||Output||
> |T1|      |   1.2|             | 
> |T2|User1,10|    |     (10,1.2)|
> |T3|User2,11|            |     (11,1.2)|
> |T4|          |   1.3|             |     
> |T5|User3,9 |    |      (9,1.3)|
> |...|
> Note 1. For streams that would operate on event time, at moment T3 we would 
> need to retract the previous outputs ((10, 1.2), (11,1.2) ) and reemit them 
> as ((10,1.3), (11,1.3) ). 
> Note 2. Rather than failing when a new value comes in the inner query we just 
> update the state that holds the single value. If option 1 for the behavior of 
> LogicalSingleValue is chosen, than an error should be triggered at moment T3.
> **Implementation option**
> Considering the notes and the option for the behavior the operator would be 
> implemented by using the join function of flink  with a custom always true 
> join condition and an inner selection for the output based on the incoming 
> direction (to mimic the left join). The single value selection can be 
> implemented over a statefull flat map. In case the join is executed in 
> parallel by multiple operators, than we either use a parallelism of 1 for the 
> statefull flatmap (option 1) or we broadcast the outputs of the flatmap to 
> all join instances to ensure consistency of the results (option 2). 
> Considering that the flatMap functionality of selecting one value is light, 
> option 1 is better.  The design schema is shown below.
> !innerquery.png!
> **General logic of Join**
> ```
> leftDataStream.join(rightDataStream)
>                  .where(new ConstantConditionSelector())
>                  .equalTo(new ConstantConditionSelector())
>                 .window(window.create())
>                 .trigger(new LeftFireTrigger())
>                 .evictor(new Evictor())
>                .apply(JoinFunction());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to