Hi Radu,

Updates of the result (materialized view) are not always simple appends. If
the query is a non-windowed aggregation or a windowed aggregation (or join)
with late data, some parts of the result need to be removed or updated.
I think in order to implement the second option, we would need to emit the
complete result for every update because we do not know which parts of the
previous view became invalid. This is not practical, because it would mean
to hold the complete result as state and to the complete result for every
update.

In contrast, the first option sends retraction and update records to update
the latest view.
Moreover, we only need to hold those results as state that might be updated
and not the complete result.

I agree that the discussion helps a lot.

Best, Fabian

2017-01-30 15:49 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I would like to ask for a further clarifications about the statement:
> " a streaming query should be equivalent to the result of a batch query
> that is executed on the materialized stream "
>
> I do agree with the principle but the question that we I would like to ask
> is how do we interpret the relation between a stream and the materialized
> view of the stream at some point. If we consider that we materialize the
> view on the elements we received on the stream until moment X (let's say it
> had elements 1 2 3) and we apply an SQL query, indeed this should give the
> exact same result as if 1 2 3 would be in a database/batch and we apply the
> same logic. However, some time later in the future, if we receive another
> element (e.g. 4) do we have the same materialized view, which we update or
> we consider a new state, a new materialized view and therefore a new
> scenario. Basically assuming we take last value. Then we can have two
> options:
>
> Option 1)  At moment x the output is 3 (last  value of the materialized
> view of 1 2 3 is 3) and then at moment X+1 when 4 arrives, the last value
> remains unchanged 1 (it is the same materialize view)
> Option 2) At moment x the output is 3 (last  value of the materialized
> view of 1 2 3 is 3) and then at moment X+1 when 4 arrives, the first value
> is modified to 4 (it is a new materialized view and the output is as we
> would apply the SQL query on the batch case with all elements 1 2 3 4)
>
> I would assume (based on previous discussions and the panel in flink
> forward) that we rather go for option 2. The correct output of a SQL query
> on a stream is that one would create a materialized view at that point in
> time and apply the query in batch mode. When a new element arrives (stream
> evolves) then we will get a new materialized view.
>
> If this is the case as my assumption that I would say that SINGLE_ VALUE
> should be continuously updated as the stream on top of which is applied
> evolves.
>
> My 2cents (anyway - I think the discussion is very useful and hopefully
> applicable also for other operators/scenarios that we are going to
> implement)
>
>
>
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Monday, January 30, 2017 2:09 PM
> To: dev@flink.apache.org
> Subject: Re: STREAM SQL inner queries
>
> Hi Radu,
>
> I think it is most important to get the semantics of a streaming query
> right.
> In my opinion, the result of a streaming query should be equivalent to the
> result of a batch query that is executed on the materialized stream.
> It should not matter whether you append the records received from a Kafka
> topic to a table and execute a batch query on that table or if you do run
> the same query continuously on the Kafka topic.
>
> It is correct, that some queries become too expensive to compute if we
> implement these semantics.
> However, this would be the price to pay for stream-batch consistent
> semantics.
>
> Regarding the inner query case. I think a query should yield the same
> result, regardless of whether it is an inner or outer query.
> This is one of the core principles of SQL, that I would not change.
>
> Best, Fabian
>
> 2017-01-30 12:54 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
> > Hi Fabian,
> >
> > Thanks for the link and for the remarks.
> >
> > I do not imagine the behavior of the inner query necessary on the
> > lines you describe. I specifically refer to " is applies as well for
> > the inner query. However, as the result of the inner query evolves,
> > also the result of the join needs to be constantly recomputed. Hence,
> > for every new result of (SELECT x FROM input1 ORDER BY time LIMIT 1),
> > we would need to emit an update for each record that was joined before."
> >
> > If we consider such a working scenario, then the behavior would be
> > something like the one below, if I understand correctly. Take for
> > example the query" STREAM amount, (SELECT id FROM  inputstream1) AS
> > field1 FROM inputstream2"
> >
> > Stream1 Stream2 Output
> >                 Id1
> > User1,10                        (10,Id1)
> > User2,11                        (11,Id2)
> >                 Id3             (10,Id3), (11, Id3)
> > User3,9                 (9,Id3)
> > ...
> >
> > ...regardless of how we express the logic of the inner query (do we
> > use LIMIT 1, we don't....), I would expect that the outputs that were
> > emitted are not retracted or modified in the future. In the previous
> > example the
> > updates:  (10,Id3), (11, Id3) should never happen. With this in mind
> > although the inner query is translated to a LogicalJoin operator, the
> > functionality is more similar with a union or a coFlatMap, where we
> > only use one input as the holder for what to associate in the future
> > for the other. Anyway, I do not see the need to have any buffers (as
> > for the general case of joins) to compute the content for creating the
> > output from the inner query.
> >
> > Regarding your previous comment about failing based on SINGLE_VALUE
> > verification: this is also something to be just agree. After all, as
> > the implementation is decoupled from the parsing of the query, we can
> > implement any of the behaviors: either through an error when a second
> > element or update would happen in the second stream, or just update
> > the single value state for future use.
> >
> > All in all, it think we just need to clarify the expectation to have.
> > Please let me know what do you think.
> >
> > I agree with the approach of starting small - even with some very
> > limited cases when we support inner queries and then extend or define
> > the general cases.
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> > -----Original Message-----
> > From: Fabian Hueske [mailto:fhue...@gmail.com]
> > Sent: Monday, January 30, 2017 12:33 PM
> > To: dev@flink.apache.org
> > Subject: Re: STREAM SQL inner queries
> >
> > Hi Radu,
> >
> > I thought about your join proposal again and think there is an issue with
> > the semantics.
> >
> > The problem is that the result of a query is recomputed as new data
> > arrives in the dynamic table.
> > This applies as well for the inner query. However, as the result of the
> > inner query evolves, also the result of the join needs to be constantly
> > recomputed. Hence, for every new result of (SELECT x FROM input1 ORDER BY
> > time LIMIT 1), we would need to emit an update for each record that was
> > joined before.
> >
> > In order to prevent this, the join would need to have a time-based join
> > predicate defining that a tuple of the outer query should join with the
> > current value of the inner query that the time of its own timestamp.
> Such a
> > predicate can be expressed in SQL but this is quite cumbersome.
> >
> > Julian Hyde (Apache Calcite committer) discussed similar use cases in a
> > document and proposed something called Temporal Tables [1].
> > In some sense, the proposed dynamic tables are a special case of temporal
> > table always reflecting the current point in time (i.e., not a previous
> > point in time).
> >
> > Best, Fabian
> >
> > [1]
> > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqF
> > MBtzYiIY4dHe0Q
> >
> > 2017-01-27 21:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> >
> > > Hi Radu,
> > >
> > > I was a bit surprised, that Calcite's parser accepted your query.
> > > Hence, I check the Calcite plan and and had look at the documentation
> > > of Calcite's SqlSingleValueAggFunction:
> > >
> > > > SINGLE_VALUE aggregate function returns the input value if there is
> > > > only
> > > one value in the input; Otherwise it triggers a run-time error.
> > >
> > > So, the query would only pass if the inner query returns a single
> > > value (which would usually not be the case of a stream).
> > > The SINGLE_VALUE check is not added to the plan if the inner query is
> > > guaranteed to return a single value (LIMIT 1, global aggregation).
> > >
> > > Anyway, I agree that we could start to add the simple cases of these
> > > joins for processing time.
> > > For event-time, I think we need to consider late arriving data and
> > > support retraction values.
> > >
> > > Best, Fabian
> > >
> > > 2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the feedback!
> > >>
> > >> I agree that we should get the semantics right and after we can
> > >> implement it. I think it would be quite useful. Now, regarding the
> > remarks you made:
> > >>
> > >>
> > >> "> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1
> > >> FROM
> > >> inputstream2
> > >> you are suggesting that the subquery (SELECT id FROM  inputstream1)
> > >> returns a single value (I assume the last received value). I would
> > >> interpret the query differently and expect it to return the values of
> > >> all rows of the inputstream1 up to the current point in time.
> > >> "
> > >>
> > >> It is a good point. It is not so much that I wanted to suggest that
> > >> this should be the syntax to use - I just relied basically on the
> > >> logical operators that calcite has parsed the query into (JOIN +
> SINGLE
> > VALUE).
> > >> Based on this logical translation I would say the correct
> > >> implementation for this translation is to return one value not
> > >> necessarily the whole content of the stream. Anyway, we are not
> > >> restricted to this as we could potentially use different rules in
> > calcite to alter the resulting plan.
> > >>
> > >> However, if we decide that such queries should return the whole
> > >> stream rather than a single value - we are indeed tapping in the
> > >> problem of potentially unbounded cases. For this I do agree that the
> > >> approach you proposed to rely on dynamic tables is very good. In such
> > >> a case we would just pass to the upper operators the entire content of
> > the dynamic table.
> > >> For that matter it works also for the single value (as the table
> > >> would contain only one value). However, for the simple case of
> > >> returning a single value we can provide even now an implementation
> > >> and we do not need to wait until the full functionality of dynamic
> > tables is provided.
> > >>
> > >> In the same time I also agree that the syntax  " a FROM inputstream
> > >> ORDER BY time LIMIT 1" is elegant. I have not issue to consider the
> > >> case of inner queries to be translated like this only when they would
> > have the "Limit 1"
> > >> specified or directly only when they are provided in such a form.
> > >>
> > >> I will wait for additional remarks in order to all agree on a
> > >> specific semantic and then I will push this in a jira issue to be
> > >> furthered review and validated.
> > >>
> > >> Best regards,
> > >>
> > >>
> > >>
> > >> Dr. Radu Tudoran
> > >> Senior Research Engineer - Big Data Expert IT R&D Division
> > >>
> > >>
> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> European Research Center
> > >> Riesstrasse 25, 80992 München
> > >>
> > >> E-mail: radu.tudo...@huawei.com
> > >> Mobile: +49 15209084330
> > >> Telephone: +49 891588344173
> > >>
> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> This e-mail and its attachments contain confidential information from
> > >> HUAWEI, which is intended only for the person or entity whose address
> is
> > >> listed above. Any use of the information contained herein in any way
> > >> (including, but not limited to, total or partial disclosure,
> > reproduction,
> > >> or dissemination) by persons other than the intended recipient(s) is
> > >> prohibited. If you receive this e-mail in error, please notify the
> > sender
> > >> by phone or email immediately and delete it!
> > >>
> > >> -----Original Message-----
> > >> From: Fabian Hueske [mailto:fhue...@gmail.com]
> > >> Sent: Thursday, January 26, 2017 4:51 PM
> > >> To: dev@flink.apache.org
> > >> Subject: Re: STREAM SQL inner queries
> > >>
> > >> Hi everybody,
> > >>
> > >> thanks for the proposal Radu.
> > >> If I understood it correctly, you are proposing a left join between a
> > >> stream and a single value (which is compute from a stream).
> > >> This makes sense and should be a common use case.
> > >>
> > >> However, I think some of your example queries do not return a single
> > >> value as required for the join.
> > >>
> > >> In your example:
> > >> > SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
> > >> inputstream2
> > >>
> > >> you are suggesting that the subquery (SELECT id FROM  inputstream1)
> > >> returns a single value (I assume the last received value).
> > >> I would interpret the query differently and expect it to return the
> > >> values of all rows of the inputstream1 up to the current point in
> time.
> > >> IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1"
> > would
> > >> capture the semantics better.
> > >>
> > >> The subquery
> > >> > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1
> HOUR
> > >> PRECEDING) AS hour_sum FROM inputstream)
> > >>
> > >> has a similar problem and would return one row for each record of
> > >> inputstream, i.e., not a single value.
> > >>
> > >> Anyway, if we get the semantics of the query that computes the single
> > >> value right, I think this type of join should be well covered by the
> > >> dynamic table proposal.
> > >> The single value input will be a dynamic table (of constant size = 1)
> > >> which is continuously updated by the engine.
> > >> Joining this table to to a dynamic (append) table will result in a
> > >> continuously growing dynamic table, which can be emitted as a stream.
> > >>
> > >> This would look very similar as you proposed but we would need to make
> > >> sure that the single value query actually returns a single value.
> > >>
> > >> @Xingcan Thanks for your feedback.
> > >> I would suggest to move the general discussion about the dynamic table
> > >> proposal to the thread that Radu started (I responded there a few
> > minutes
> > >> ago).
> > >>
> > >> Just a few comments here: By logically converting a stream into a
> > dynamic
> > >> table we have well defined semantics for the operation such as
> > aggregations
> > >> and joins.
> > >> However, you are right, that this does not mean that we can
> efficiently
> > >> apply all operations on dynamic tables that we can apply on an actual
> > batch
> > >> table. Some operations are just too expensive or require too much
> state
> > to
> > >> be performed in a streaming fashion. So yes, there will be some
> > >> restrictions but that is rather to the nature of stream processing
> than
> > to
> > >> the idea of dynamic tables, IMO.
> > >>
> > >> Best,
> > >> Fabian
> > >>
> > >>
> > >> 2017-01-26 11:33 GMT+01:00 Xingcan <xingc...@gmail.com>:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I've read the document about dynamic table. Honestly, I think it's
> > >> > well-defined and ingeniously compromise the batch and stream. There
> > >> > are two questions about the design.
> > >> >
> > >> > 1) Though it's fine to take the stream as a snapshot of a dynamic
> > >> > table, a table is essentially a set while a stream is essentially an
> > >> > ordered list (with xxTime). I'm not sure if the operations on a set
> > >> > will all suit for a list (e.g union or merge?). Of course, we can
> add
> > >> > an "order by time" to all SQL instances, but will it be suitable?
> > >> >
> > >> > 2) As radu said, I also think inner query is essential for a query
> > >> > language. (I didn't see any select from (select) in the document).
> The
> > >> > problem is, the SQL is based on a closure theory while we can not
> > >> > prove that for a stream. Can the result from a stream operation be
> > >> another input?
> > >> > It depends. The window operator will convert "point of time" events
> to
> > >> > "period of time" events and I don't know if the nature of data have
> > >> > changed. Also, the partial emission will lead to heterogeneous
> > results.
> > >> >
> > >> > BTW, the "Emission of dynamic tables" section seem to be a little
> > >> > incompatible with the whole document...
> > >> >
> > >> > Best,
> > >> > Xingcan
> > >> >
> > >> > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran
> > >> > <radu.tudo...@huawei.com>
> > >> > wrote:
> > >> >
> > >> > > Hi Shaoxuan,
> > >> > >
> > >> > > Thanks for the feedback!
> > >> > > Regarding the proposal for relational queries that you
> referenced, I
> > >> > > am a bit confused with respect to its purpose and evolution with
> > >> > > respect to
> > >> > the
> > >> > > current implementation of stream sql - is it suppose to replace
> this
> > >> > > implementation, to complement it....but I will send another email
> > >> > > about this as I guess this can be a standalone discussion tread
> > >> > >
> > >> > > Also, regarding the join stream-to-stream I intend to start
> another
> > >> > > discussion about this such that we can decide all together if we
> can
> > >> > start
> > >> > > some implementation/design now or we need to wait.
> > >> > >
> > >> > > Now, regarding the inner queries and the points you raised. It is
> > >> > > true that in general an inner join would work like any other join
> > >> > > (which obviously requires some buffering capabilities and
> mechanisms
> > >> > > to restrict the infinite growth for the join state composition).
> > >> > > However, at least
> > >> > for
> > >> > > some cases of supporting inner queries we can support them without
> > >> > > the
> > >> > need
> > >> > > for buffering mechanisms or full support for inner join / left
> join.
> > >> > > Basically the logical operator in which an inner query is
> translated
> > >> > (left
> > >> > > join with an always true condition is to some extend more similar
> to
> > >> > UNION
> > >> > > ,- and the union implementation, then the implementation we will
> > >> > > have for the joins). This is why I believe we can already provide
> > >> > > the support for this (I also tested a PoC implementation
> internally
> > >> > > for this and it
> > >> > works).
> > >> > > In terms of examples when we could use this, please see the next 2
> > >> > > examples. Please let me know what do you think and whether it is
> > >> > > worth designing the jira issue perhaps with some more details
> > >> > > (including the technical details).
> > >> > >
> > >> > > Consider the example below:
> > >> > >
> > >> > > SELECT STREAM user
> > >> > >         FROM inputstream
> > >> > >         WHERE amount > (SELECT STREAM Min(amount2) FROM
> > >> > > inputstream2)
> > >> > >
> > >> > > The point of this is to restrict the values you are selecting
> based
> > >> > > on some value that you have from the other stream. Consider the
> > >> > > values below that come in each stream
> > >> > >
> > >> > > Inputstream             inputstream2            Result
> > >> > > User1,100                                       user1 (because
> there
> > >> is
> > >> > no
> > >> > > value in inputstream2 and the left join should not restrict the
> > >> > > output in this case)
> > >> > >                         X,x,10                  nothing as there
> is
> > no
> > >> > > event in inputstream to be outputted. Min will become from now 10
> > >> > > User2, 20                                       user2 (because 20
> is
> > >> > > greater than 10 which is the minimum retain in inputstream2)
> > >> > >                         X,x,20                  nothing as there
> is
> > no
> > >> > > event in inputstream to be outputted. Min will remain from now 10
> > >> > >                         X,x, 5                  nothing as there
> is
> > no
> > >> > > event in inputstream to be outputted. Min will become from now 5
> > >> > > User3, 8                                        User3 (because 8
> is
> > >> > > greater than 5)
> > >> > > ....
> > >> > >
> > >> > >
> > >> > > The goal for the final usage of this is to be able among others to
> > >> > > define multiple window processing on the same input stream.
> > Consider:
> > >> > >
> > >> > > SELECT STREAM user
> > >> > >         FROM inputstream
> > >> > >         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY
> > >> > > timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM
> > >> > > inputstream) < amount
> > >> > >
> > >> > >
> > >> > > Assume you have the following events each coming every 30 minutes
> > >> > > User1, 100   -> Average is 100 and the output of the topology that
> > >> > > implements the query is NULL (no output as 100 is not > than 100)
> > >> > > User2, 10    -> Average is 55 and the output of the topology that
> > >> > > implements the query is NULL (no output as 10 is not > than 55)
> > >> > > User3, 40    -> Average is 25 (10+40) and the output of the
> topology
> > >> that
> > >> > > implements the query is User3 (40 is > than 25) ....
> > >> > > Although the query as it is depends on aggregates and windows, the
> > >> > > operator to implement the inner query can be implemented
> > >> > > independently of functions that are contained in the query. Also,
> > >> > > there is no need for a window or buffering to implement the logic
> > >> > > for assembling the results
> > >> > from
> > >> > > the inner query.
> > >> > >
> > >> > >
> > >> > > Best regards,
> > >> > >
> > >> > > -----Original Message-----
> > >> > > From: Shaoxuan Wang [mailto:wshaox...@gmail.com]
> > >> > > Sent: Thursday, January 26, 2017 4:36 AM
> > >> > > To: dev@flink.apache.org
> > >> > > Subject: Re: STREAM SQL inner queries
> > >> > >
> > >> > >  Hi Radu,
> > >> > > Similar as the stream-stream join, this stream-stream inner query
> > >> > > does
> > >> > not
> > >> > > seem to be well defined. It needs provide at least some kind of
> > >> > > window bounds to complete the streaming SQL semantics. If this is
> an
> > >> > > unbounded join/select, a mechanism of how to store the infinite
> date
> > >> > > has to be considered. I may not fully understand your proposal.
> > >> > > Could you please provide more details about this inner query, say
> > >> > > giving some examples of input and output. It would be also great
> if
> > >> > > you can explain the use case
> > >> > of
> > >> > > this inner query. This helps us to understand the semantics.
> > >> > >
> > >> > > It should also be noted that, we have recently decided to unify
> > >> > > stream
> > >> > and
> > >> > > batch query with the same regular (batch) SQL. Therefore we have
> > >> > > removed the support for STREAM keyword in flink Streaming SQL. In
> > >> > > the past
> > >> > several
> > >> > > months, Fabian and Xiaowei Jiang have started to work on the
> future
> > >> > > Relational Queries on flink streaming. Fabian has drafted a very
> > >> > > good design doc, https://goo.gl/m31kkE. The design is based on a
> > new
> > >> > > concept of dynamic table whose content changes over time, thereby
> > >> > > can be derived from streams. With this dynamic table, stream query
> > >> > > can be done via
> > >> > regular
> > >> > > (batch) SQL. Besides some syntax sugar, there is not too much
> > >> > > difference between batch query and stream query (in terms of what
> > >> > > and where of a
> > >> > query
> > >> > > is executed). Stream query has addition characters in the manners
> of
> > >> > > when to emit a result and how to refine the result considering the
> > >> retraction.
> > >> > >
> > >> > > Hope this helps and look forward to working with you on streaming
> > SQL.
> > >> > >
> > >> > > Regards,
> > >> > > Shaoxuan
> > >> > >
> > >> > >
> > >> > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran
> > >> > > <radu.tudo...@huawei.com>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi all,
> > >> > > >
> > >> > > > I would like to open a jira issue (and then provide the
> > >> > > > implementation) for supporting inner queries. The idea is to be
> > >> > > > able to support SQL queries as the ones presented in the
> scenarios
> > >> below.
> > >> > > > The key idea is that supporting inner queries would require to
> > >> > > > have the
> > >> > > implementation for:
> > >> > > >
> > >> > > > è JOIN (type = left and condition = true) - Basically this is a
> > >> > > > simple implementation for a join function between 2 streams that
> > >> > > > does not require any window support behind the scenes as there
> is
> > >> > > > no condition on which to perform the join
> > >> > > >
> > >> > > > è SINGLE_VALUE - this operator would require to provide one
> value
> > >> > > > to be furthered joined. In the context of streaming this value
> > >> > > > should basically evolve with the contents of the window. This
> > >> > > > could be implemented with a flatmap function as left joins would
> > >> > > > allow also to do the mapping with null values
> > >> > > >
> > >> > > > We can then extend this initial and simple implementation to
> > >> > > > provide support for joins in general (conditional joins, right
> > >> > > > joins..) or we can isolate this implementation for this specific
> > >> > > > case of inner queries and go with a totally new design for
> stream
> > >> > > > to stream joins (might be needed depending on what is the
> decision
> > >> > > > behind on how to support the conditional
> > >> > > > mapping)
> > >> > > >
> > >> > > > What do you think about this?
> > >> > > >
> > >> > > > Examples of scenarios to apply
> > >> > > >
> > >> > > > SELECT STREAM amount,
> > >> > > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
> > >> > > >
> > >> > > > Translated to
> > >> > > > LogicalProject(amount=[$1], c=[$4])
> > >> > > >     LogicalJoin(condition=[true], joinType=[left])
> > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > >> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > >> > > >         LogicalProject(user_id=[$0])
> > >> > > >           LogicalTableScan(table=[[inputstream2]])
> > >> > > >
> > >> > > > Or from the same stream - perhaps interesting for applying some
> > >> > > > more complex operations within the inner query SELECT STREAM
> > >> > > > amount, (SELECT id FROM  inputstream1) AS field1 FROM
> inputstream1
> > >> > > >
> > >> > > > Translated to
> > >> > > > LogicalProject(amount=[$1], c=[$4])
> > >> > > >     LogicalJoin(condition=[true], joinType=[left])
> > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > >> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > >> > > >         LogicalProject(user_id=[$0])
> > >> > > >           LogicalTableScan(table=[[inputstream1]])
> > >> > > >
> > >> > > > Or used to do the projection
> > >> > > > SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM
> inputstream1)
> > >> > > >
> > >> > > > Translated to
> > >> > > >   LogicalProject(amount=[$1], c=[$5])
> > >> > > >     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4],
> > >> > c=[$5])
> > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > >> > > >
> > >> > > >
> > >> > > > Or in the future even
> > >> > > > SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount)
> > >> > > > OVER window AS myagg FROM inputstream1)) ...
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to