Hi all,

I would like to open a jira issue (and then provide the implementation) for 
supporting inner queries. The idea is to be able to support SQL queries as the 
ones presented in the scenarios below. The key idea is that supporting inner 
queries would require to have the implementation for:

è JOIN (type = left and condition = true) - Basically this is a simple 
implementation for a join function between 2 streams that does not require any 
window support behind the scenes as there is no condition on which to perform 
the join

è SINGLE_VALUE - this operator would require to provide one value to be 
furthered joined. In the context of streaming this value should basically 
evolve with the contents of the window. This could be implemented with a 
flatmap function as left joins would allow also to do the mapping with null 
values

We can then extend this initial and simple implementation to provide support 
for joins in general (conditional joins, right joins..) or we can isolate this 
implementation for this specific case of inner queries and go with a totally 
new design for stream to stream joins (might be needed depending on what is the 
decision behind on how to support the conditional mapping)

What do you think about this?

Examples of scenarios to apply

SELECT STREAM amount,
(SELECT id FROM  inputstream1) AS field1
FROM inputstream2

Translated to
LogicalProject(amount=[$1], c=[$4])
    LogicalJoin(condition=[true], joinType=[left])
      LogicalTableScan(table=[[inputstream1]])
      LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
        LogicalProject(user_id=[$0])
          LogicalTableScan(table=[[inputstream2]])

Or from the same stream - perhaps interesting for applying some more complex 
operations within the inner query
SELECT STREAM amount,
(SELECT id FROM  inputstream1) AS field1
FROM inputstream1

Translated to
LogicalProject(amount=[$1], c=[$4])
    LogicalJoin(condition=[true], joinType=[left])
      LogicalTableScan(table=[[inputstream1]])
      LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
        LogicalProject(user_id=[$0])
          LogicalTableScan(table=[[inputstream1]])

Or used to do the projection
SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)

Translated to
  LogicalProject(amount=[$1], c=[$5])
    LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
      LogicalTableScan(table=[[inputstream1]])


Or in the future even
SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER window AS 
myagg FROM inputstream1))
...



Reply via email to