Hi everybody,

thanks for the proposal Radu.
If I understood it correctly, you are proposing a left join between a
stream and a single value (which is compute from a stream).
This makes sense and should be a common use case.

However, I think some of your example queries do not return a single value
as required for the join.

In your example:
> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
inputstream2

you are suggesting that the subquery (SELECT id FROM  inputstream1) returns
a single value (I assume the last received value).
I would interpret the query differently and expect it to return the values
of all rows of the inputstream1 up to the current point in time.
IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1" would
capture the semantics better.

The subquery
> (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1 HOUR
PRECEDING) AS hour_sum FROM inputstream)

has a similar problem and would return one row for each record of
inputstream, i.e., not a single value.

Anyway, if we get the semantics of the query that computes the single value
right, I think this type of join should be well covered by the dynamic
table proposal.
The single value input will be a dynamic table (of constant size = 1) which
is continuously updated by the engine.
Joining this table to to a dynamic (append) table will result in a
continuously growing dynamic table, which can be emitted as a stream.

This would look very similar as you proposed but we would need to make sure
that the single value query actually returns a single value.

@Xingcan Thanks for your feedback.
I would suggest to move the general discussion about the dynamic table
proposal to the thread that Radu started (I responded there a few minutes
ago).

Just a few comments here: By logically converting a stream into a dynamic
table we have well defined semantics for the operation such as aggregations
and joins.
However, you are right, that this does not mean that we can efficiently
apply all operations on dynamic tables that we can apply on an actual batch
table. Some operations are just too expensive or require too much state to
be performed in a streaming fashion. So yes, there will be some
restrictions but that is rather to the nature of stream processing than to
the idea of dynamic tables, IMO.

Best,
Fabian


2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>:

> Hi all,
>
> I've read the document about dynamic table. Honestly, I think it's
> well-defined and ingeniously compromise the batch and stream. There are two
> questions about the design.
>
> 1) Though it's fine to take the stream as a snapshot of a dynamic table, a
> table is essentially a set while a stream is essentially an ordered list
> (with xxTime). I'm not sure if the operations on a set will all suit for a
> list (e.g union or merge?). Of course, we can add an "order by time" to all
> SQL instances, but will it be suitable?
>
> 2) As radu said, I also think inner query is essential for a query
> language. (I didn't see any select from (select) in the document). The
> problem is, the SQL is based on a closure theory while we can not prove
> that for a stream. Can the result from a stream operation be another input?
> It depends. The window operator will convert "point of time" events to
>  "period of time" events and I don't know if the nature of data have
> changed. Also, the partial emission will lead to heterogeneous results.
>
> BTW, the "Emission of dynamic tables" section seem to be a little
> incompatible with the whole document...
>
> Best,
> Xingcan
>
> On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran <radu.tudo...@huawei.com>
> wrote:
>
> > Hi Shaoxuan,
> >
> > Thanks for the feedback!
> > Regarding the proposal for relational queries that you referenced, I am a
> > bit confused with respect to its purpose and evolution with respect to
> the
> > current implementation of stream sql - is it suppose to replace this
> > implementation, to complement it....but I will send another email about
> > this as I guess this can be a standalone discussion tread
> >
> > Also, regarding the join stream-to-stream I intend to start another
> > discussion about this such that we can decide all together if we can
> start
> > some implementation/design now or we need to wait.
> >
> > Now, regarding the inner queries and the points you raised. It is true
> > that in general an inner join would work like any other join (which
> > obviously requires some buffering capabilities and mechanisms to restrict
> > the infinite growth for the join state composition). However, at least
> for
> > some cases of supporting inner queries we can support them without the
> need
> > for buffering mechanisms or full support for inner join / left join.
> > Basically the logical operator in which an inner query is translated
> (left
> > join with an always true condition is to some extend more similar to
> UNION
> > ,- and the union implementation, then the implementation we will have for
> > the joins). This is why I believe we can already provide the support for
> > this (I also tested a PoC implementation internally for this and it
> works).
> > In terms of examples when we could use this, please see the next 2
> > examples. Please let me know what do you think and whether it is worth
> > designing the jira issue perhaps with some more details (including the
> > technical details).
> >
> > Consider the example below:
> >
> > SELECT STREAM user
> >         FROM inputstream
> >         WHERE amount > (SELECT STREAM Min(amount2) FROM inputstream2)
> >
> > The point of this is to restrict the values you are selecting based on
> > some value that you have from the other stream. Consider the values below
> > that come in each stream
> >
> > Inputstream             inputstream2            Result
> > User1,100                                       user1 (because there is
> no
> > value in inputstream2 and the left join should not restrict the output in
> > this case)
> >                         X,x,10                  nothing as there is no
> > event in inputstream to be outputted. Min will become from now 10
> > User2, 20                                       user2 (because 20 is
> > greater than 10 which is the minimum retain in inputstream2)
> >                         X,x,20                  nothing as there is no
> > event in inputstream to be outputted. Min will remain from now 10
> >                         X,x, 5                  nothing as there is no
> > event in inputstream to be outputted. Min will become from now 5
> > User3, 8                                        User3 (because 8 is
> > greater than 5)
> > ....
> >
> >
> > The goal for the final usage of this is to be able among others to define
> > multiple window processing on the same input stream. Consider:
> >
> > SELECT STREAM user
> >         FROM inputstream
> >         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY timestamp
> > RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM inputstream) < amount
> >
> >
> > Assume you have the following events each coming every 30 minutes
> > User1, 100   -> Average is 100 and the output of the topology that
> > implements the query is NULL (no output as 100 is not > than 100)
> > User2, 10    -> Average is 55 and the output of the topology that
> > implements the query is NULL (no output as 10 is not > than 55)
> > User3, 40    -> Average is 25 (10+40) and the output of the topology that
> > implements the query is User3 (40 is > than 25)
> > ....
> > Although the query as it is depends on aggregates and windows, the
> > operator to implement the inner query can be implemented independently of
> > functions that are contained in the query. Also, there is no need for a
> > window or buffering to implement the logic for assembling the results
> from
> > the inner query.
> >
> >
> > Best regards,
> >
> > -----Original Message-----
> > From: Shaoxuan Wang [mailto:wshaox...@gmail.com]
> > Sent: Thursday, January 26, 2017 4:36 AM
> > To: dev@flink.apache.org
> > Subject: Re: STREAM SQL inner queries
> >
> >  Hi Radu,
> > Similar as the stream-stream join, this stream-stream inner query does
> not
> > seem to be well defined. It needs provide at least some kind of window
> > bounds to complete the streaming SQL semantics. If this is an unbounded
> > join/select, a mechanism of how to store the infinite date has to be
> > considered. I may not fully understand your proposal. Could you please
> > provide more details about this inner query, say giving some examples of
> > input and output. It would be also great if you can explain the use case
> of
> > this inner query. This helps us to understand the semantics.
> >
> > It should also be noted that, we have recently decided to unify stream
> and
> > batch query with the same regular (batch) SQL. Therefore we have removed
> > the support for STREAM keyword in flink Streaming SQL. In the past
> several
> > months, Fabian and Xiaowei Jiang have started to work on the future
> > Relational Queries on flink streaming. Fabian has drafted a very good
> > design doc, https://goo.gl/m31kkE. The design is based on a new concept
> > of dynamic table whose content changes over time, thereby can be derived
> > from streams. With this dynamic table, stream query can be done via
> regular
> > (batch) SQL. Besides some syntax sugar, there is not too much difference
> > between batch query and stream query (in terms of what and where of a
> query
> > is executed). Stream query has addition characters in the manners of when
> > to emit a result and how to refine the result considering the retraction.
> >
> > Hope this helps and look forward to working with you on streaming SQL.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <radu.tudo...@huawei.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I would like to open a jira issue (and then provide the
> > > implementation) for supporting inner queries. The idea is to be able
> > > to support SQL queries as the ones presented in the scenarios below.
> > > The key idea is that supporting inner queries would require to have the
> > implementation for:
> > >
> > > è JOIN (type = left and condition = true) - Basically this is a simple
> > > implementation for a join function between 2 streams that does not
> > > require any window support behind the scenes as there is no condition
> > > on which to perform the join
> > >
> > > è SINGLE_VALUE - this operator would require to provide one value to
> > > be furthered joined. In the context of streaming this value should
> > > basically evolve with the contents of the window. This could be
> > > implemented with a flatmap function as left joins would allow also to
> > > do the mapping with null values
> > >
> > > We can then extend this initial and simple implementation to provide
> > > support for joins in general (conditional joins, right joins..) or we
> > > can isolate this implementation for this specific case of inner
> > > queries and go with a totally new design for stream to stream joins
> > > (might be needed depending on what is the decision behind on how to
> > > support the conditional
> > > mapping)
> > >
> > > What do you think about this?
> > >
> > > Examples of scenarios to apply
> > >
> > > SELECT STREAM amount,
> > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
> > >
> > > Translated to
> > > LogicalProject(amount=[$1], c=[$4])
> > >     LogicalJoin(condition=[true], joinType=[left])
> > >       LogicalTableScan(table=[[inputstream1]])
> > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > >         LogicalProject(user_id=[$0])
> > >           LogicalTableScan(table=[[inputstream2]])
> > >
> > > Or from the same stream - perhaps interesting for applying some more
> > > complex operations within the inner query SELECT STREAM amount,
> > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream1
> > >
> > > Translated to
> > > LogicalProject(amount=[$1], c=[$4])
> > >     LogicalJoin(condition=[true], joinType=[left])
> > >       LogicalTableScan(table=[[inputstream1]])
> > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > >         LogicalProject(user_id=[$0])
> > >           LogicalTableScan(table=[[inputstream1]])
> > >
> > > Or used to do the projection
> > > SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
> > >
> > > Translated to
> > >   LogicalProject(amount=[$1], c=[$5])
> > >     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4],
> c=[$5])
> > >       LogicalTableScan(table=[[inputstream1]])
> > >
> > >
> > > Or in the future even
> > > SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER
> > > window AS myagg FROM inputstream1)) ...
> > >
> > >
> > >
> > >
> >
>

Reply via email to