[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627294#comment-16627294
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

fhueske closed pull request #3609: [FLINK-6073] [table] Support for SQL inner 
queries for proctime
URL: https://github.com/apache/flink/pull/3609
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
new file mode 100644
index 000..c623c3ccd79
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+  calc: LogicalJoin,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputLeft: RelNode,
+  inputRight: RelNode,
+  rowType: RelDataType,
+  description: String)
+extends BiRel(cluster, traitSet, inputLeft, inputRight) with DataStreamRel 
{
+
+  override def deriveRowType(): RelDataType = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+new DataStreamJoin(
+  calc,
+  

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627293#comment-16627293
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

fhueske commented on issue #3609: [FLINK-6073] [table] Support for SQL inner 
queries for proctime
URL: https://github.com/apache/flink/pull/3609#issuecomment-424331591
 
 
   FLINK-9714 has been added and will be included in the next release. 
   Closing this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554184#comment-16554184
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3609
  
This PR can be closed as it will be addressed by FLINK-9714.
Thanks, Fabian


> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 streams that 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2018-07-24 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554181#comment-16554181
 ] 

Fabian Hueske commented on FLINK-6073:
--

The use case of this issue will be addressed by a processing-time 
time-versioned join (see FLINK-9714).
I will close this issue and the corresponding PR.
Thanks, Fabian

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 streams that operate on 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-06-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16036251#comment-16036251
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r120007611
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+  calc: LogicalJoin,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputLeft: RelNode,
+  inputRight: RelNode,
+  rowType: RelDataType,
+  description: String)
+extends BiRel(cluster, traitSet, inputLeft, inputRight) with 
DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  calc,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  description + calc.getId())
+  }
+
+  override def toString: String = {
+s"Join(${
+  if 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949735#comment-15949735
 ] 

radu commented on FLINK-6073:
-

[~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] 
[~stefano.bortoli]
Just to recap the discussion so far to covert towards a conclusion. 
Currently there are 2 options for supporting inner joins:
Option 1: Implement the inner query based on the strongly typed query model 
mentioned.
{code}
SELECT A1, B1  FROM T1, T2
   WHERE T1.A3 =  
   (SELECT Max(T1.A3) FROM T1 WHERE T1.A3 <= T2.B3)
{code}
 In this case we would implement a rule to detect the complex translation 
pattern and convert it to a simple implementation

Option 2: Implement the inner query based on a simplified (i.e., implicit link) 
between the outer query and inner query. This can be done for a query like:
{code}
SELECT B1, (SELECT A1 as ab FROM T1 WHERE proctime() BETWEEN current_timestamp 
- INTERVAL '1' HOUR AND current_timestamp LIMIT 1) FROM T2
{code}
In such a query the implicit assumption is that the current_timestamp comes 
from the outer query. The reasoning for this is that always the outer query is 
the one that drives the query (a result is emitted only when something arrives 
from the main stream), hence the current_timestamp would come from the mai 
stream, while the proctime would refer to the property time carried by each 
event (in this case the ingestion time). Moreover this syntax offer an 
equivalency with the batch query where the selection would be done using the 
now() function to filter based on time

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949103#comment-15949103
 ] 

radu commented on FLINK-6073:
-

LogicalProject(A1=[$0])
  LogicalJoin(condition=[AND(=($2, $4), =($2, $5), =($1, $3))], 
joinType=[inner])
LogicalProject(A1=[$0], A2=[$1], A4=[$3])
  LogicalTableScan(table=[[T1]])
LogicalAggregate(group=[{0, 2, 3}])
  LogicalSort(sort0=[$1], dir0=[ASC], fetch=[1])
LogicalProject(B2=[$0], B4=[$1], A4=[$2], A43=[$2])
  LogicalJoin(condition=[AND(>=($1, -($2, 360)), <=($1, $2))], 
joinType=[inner])
LogicalProject(B2=[$1], B4=[$3])
  LogicalTableScan(table=[[T2]])
LogicalAggregate(group=[{3}])
  LogicalTableScan(table=[[T1]])

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949097#comment-15949097
 ] 

Fabian Hueske commented on FLINK-6073:
--

Can you try the following query 

{code}
SELECT 
  b, 
  (SELECT a AS ab FROM t1 WHERE t1.proctime BETWEEN t2.proctime - INTERVAL '1' 
HOUR AND t2.proctime ORDER BY t1.proctime LIMIT 1) 
FROM t2
{code}

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949001#comment-15949001
 ] 

Fabian Hueske commented on FLINK-6073:
--

If you look at the implementation of the [{{SINGLE_VALUE}} in 
Calcite|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/fun/SqlSingleValueAggFunction.java#L32],
 you see that it fails if more than one row are received. The query starts with 
the assumption that the user knows what she/he is doing. If the inner query 
returns more than a single row it simply fails. IMO, that's the correct 
semantic. If we assume that {{SINGLE_VALUE}} always returns the last value, we 
change the semantics.

Moreover, the selected value would move with the stream and we would need to 
reprocess all results whenever the result of the inner query changes. 
The only way to keep previously emitted results fixed is to correlate both 
streams on time. That way we can be sure that we will not have to update a 
previously emitted result.

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948935#comment-15948935
 ] 

radu commented on FLINK-6073:
-

[~fhueske] - no i understood what you meant. I think it is correct - the logic 
of the single value should be consistent!

I think now we have 2 posibilities as far as i see:

1) Implement the inner query based on the model you mentioned. And in this case 
we would look for this particular pattern and simplify it

2) Implement the inner query for queries that have time bounds (e.g., proc time 
bounds)

 For example

SELECT B1, (SELECT A1 as ab FROM T1 WHERE proctime() BETWEEN current_timestamp 
- INTERVAL '1' HOUR AND current_timestamp) FROM T2

which would translate to
 LogicalJoin(condition=[true], joinType=[left])
  LogicalProject(B1=[$0])
LogicalTableScan(table=[[T2]])
  LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
LogicalFilter(condition=[AND(>=(PROCTIME, -(CURRENT_TIMESTAMP, 360)), 
<=($3, CURRENT_TIMESTAMP))])
  LogicalTableScan(table=[[T1]])

this would leverage the fact that as the inner query is working on time bounds

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948774#comment-15948774
 ] 

Fabian Hueske commented on FLINK-6073:
--

The semantics of the query are already defined by the SQL standard.
If you run the above query with a regular DBMS on tables which contain the 
materialized stream, it will fail. 
We cannot interpret it differently if we want to maintain the unified 
batch-stream semantics.

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948765#comment-15948765
 ] 

radu commented on FLINK-6073:
-

[~fhueske] Thanks for the remarks.

My idea of the queries such as the one you mention:

SELECT T1.A1
   (SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) 
FROM T2)
 FROM T1 

Is that the Single value should be in relation with the triggering of the 
window (basically with the evolution of the stream). In this case it would be 
basically just like any of the other aggregates with the difference that it 
ensures that when something is trigger only one result is forwarded. 

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948683#comment-15948683
 ] 

Fabian Hueske commented on FLINK-6073:
--

Hi [~rtudoran], 

I agree, the query I proposed would not be easy to optimize. We would need some 
well designed rules to transform the logical plan into an efficient execution 
plan.
However, the query produces the desired result semantics that you described in 
the JIRA. IMO, the semantics of the queries are fixed and we cannot change 
those.
What we can do though, is to focus on different join semantics first which are 
easier to implement.

The query you proposed in 1) 

{code}
SELECT T1.A1
   (SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) 
FROM T2)
 FROM T1 
{code}

has different semantics than the original result and would fail on a stream 
because the inner query {{(SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE 
INTERVAL '1' HOUR PRECEDING) FROM T2)}} would return more than a single row (in 
fact one row for each stream record). If the goal is that we have different 
exchange rates in the result, the intermediate result with which we join (i.e., 
the result of the subquery) needs to have at least one row for each exchange 
rate. Hence, we cannot implement it as an inner query in a {{SELECT}} clause 
but need a regular join with the other table.

The second query that you proposed suffers from the same problem. The subquery 
returns more than a single row.

We could to do something like 

{code}
SELECT amount, maxRate
FROM T1, 
  (SELECT TUMBLE_END(time, INTERVAL '1' HOUR) AS hour, MAX(exchange) AS maxRate 
FROM T2 GROUP BY TUMBLE(time, INTERVAL '1' HOUR) rates
WHERE CEIL(T1.time, HOUR) = rates.hour
{code}

This would join each amount with the maxRate that was observed within the same 
hour (if amount is at 11:58:00 the max rate from 11:00:00 to 11:59:59).
But this would not be the result you asked for.





> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-28 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945125#comment-15945125
 ] 

radu commented on FLINK-6073:
-

Hi [~fhueske] thanks again for the feedback - it is really helpful and i 
understand what do you mean for the retraction. I think with respect to this we 
can create more aggregators such as for single value, for list of outputs, join 
mapon which we can operate the retraction in a similar way with the model 
we used. But this i think it make sense after the  FLINK-6047 is done.

Anyway - meanwhile i like the idea you say of reshaping the query and than work 
on it (whether now or after the  FLINK-5884 is done).

I have parsed the query model you propose
SELECT A1, B1 
FROM T1, T2
   WHERE T1.A3 = (SELECT Max(T1.A3) FROM T1
   WHERE T1.A3 <= T2.B3)

The resulting logical plan is:

LogicalProject(A1=[$5], B1=[$2])
  LogicalJoin(condition=[=($7, $1)], joinType=[inner])
LogicalJoin(condition=[=($4, $0)], joinType=[inner])
  LogicalAggregate(group=[{1}], EXPR$0=[MAX($0)])
LogicalJoin(condition=[<=($0, $1)], joinType=[inner])
  LogicalProject(A3=[$2])
LogicalTableScan(table=[[T1]])
  LogicalAggregate(group=[{1}])
LogicalJoin(condition=[true], joinType=[inner])
  LogicalProject(A1=[$0])
LogicalTableScan(table=[[T1]])
  LogicalProject(B3=[$2])
LogicalTableScan(table=[[T2]])
  LogicalTableScan(table=[[T2]])
LogicalTableScan(table=[[T1]])


The problem is not that is is verbose (or that the query is more verbose), but:
1) the fact that it has many operators that are used for implementing the 
corresponding logic. One option is of course to implement each logical operator 
into a corresponding physical operator. On the one hand it will give the 
correct logic but on the other hand it will be very costly from compute 
resources point of view. The alternative would be of course to try to identify 
the pattern of this query parsing and map it to the simple implementation i 
proposed in the design document.
2) we can consider (perhaps not for the currency exchange scenario but for 
other) that we define a window boundary for the inner query
3) if you look into how the query is parsed you have in it's core the following 
pattern:
   LogicalJoin(condition=[true], joinType=[inner])
  LogicalProject(A1=[$0])
LogicalTableScan(table=[[T1]])
  LogicalProject(B3=[$2])
LogicalTableScan(table=[[T2]])
...this is again an unconditional join without boundaries which suppers in the 
final end from the same problems of not complying with the batch equivalency as 
the nitial query model.

Considering all these i would propose 2 potential middle options:
1) Have the inner query working on time boundaries  ..for example for a query 
where we would extract the max timestamp
SELECT T1.A1
   (SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) 
FROM T2)
 FROM T1 

would be translated to 
 LogicalJoin(condition=[true], joinType=[left])
  LogicalProject(A1=[$0])
LogicalTableScan(table=[[T1]])
  LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($2)])
LogicalWindow(window#0=[window(partition {} order by [1] range between $2 
PRECEDING and CURRENT ROW aggs [MAX($0)])])
  LogicalProject(B1=[$0], B4=[$3])
LogicalTableScan(table=[[T2]])
=> this is simpler and the join could be driven by the time boundaries within 
the window

2) Have the inner query with some time condition from the example you gave
"SELECT T1.A1,
   (SELECT T2.B1 FROM T2 WHERE T1.A4 <= T2.B4 )
   FROM T1

which starts to become a quite compelx query like:
 LogicalProject(A1=[$0], EXPR$1=[$5])
  LogicalJoin(condition=[=($3, $4)], joinType=[left])
LogicalTableScan(table=[[T1]])
LogicalAggregate(group=[{2}], agg#0=[SINGLE_VALUE($0)])
  LogicalJoin(condition=[<=($2, $1)], joinType=[inner])
LogicalProject(B1=[$0], B4=[$3])
  LogicalTableScan(table=[[T2]])
LogicalAggregate(group=[{3}])
  LogicalTableScan(table=[[T1]])

=>case in which we can directly consider the proper query you proposed

I would propose to focus on 1 as it make sense also from the perspective of the 
other queries...what do you think?
 


> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944870#comment-15944870
 ] 

Fabian Hueske commented on FLINK-6073:
--

Hi [~rtudoran],

the model stream/batch consistency model that we want to apply is that the 
result of a query should be at any point in time the same as if a batch query 
would be executed on the materialized streams.
So, given your input and your query 2 ({{SELECT amount, (SELECT exchange FROM 
T1 ORDER BY time LIMIT 1) AS field1 FROM T2}}), the output would evolve as 
follows:

Result at T2
(10, 1.2) <-- appended

Result at T3
(10, 1.2)
(11, 1.2) <-- appended

Result at T4
(10, 1.3) <-- updated!
(11, 1.3) <-- updated!

Result at T5
(10, 1.3)
(11, 1.3)
(9, 1.3) <-- appended

As you see, the previously emitted rows need to be updated at time T4 because 
the subquery ({{(SELECT exchange FROM T1 ORDER BY time LIMIT 1)}}) produces a 
new result.
In principle, this would be done using retraction message, which invalidate 
previously emitted rows, and emitting updated results. However, retraction 
would not work in this case, because we would need to remember the whole T2 
input stream which is not possible. There is a proposal for retraction in 
FLINK-6047 and an attached design document. So retraction is not only about 
aggregation functions but also to be able to invalidate or update previous 
results.

Now, the good news: 
We do not need retraction to implement the use case you are proposing in 
processing time because we can compute the final result at any point in time 
and do not need to update it later.
However, the query needs to be differently specified as I described above. 
I know, the query is much more verbose than the queries you suggested, but it 
captures the semantics of what you want to compute. Since, we need to access 
the time attributes of different tables, we are blocked on FLINK-5884, which 
will handle the time indicators as attributes and not as indicator methods.

What do you think?

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-28 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944671#comment-15944671
 ] 

radu commented on FLINK-6073:
-

Hi [~fhueske],

Thanks for the review. I agree that we should support the stream-batch 
equivalency as we discussed also on the mailing list. My way of viewing the 
problem would be that the stream would output the same result as if at that 
moment the data would be put in a table and the same query would be run. In 
this case emiting a result at T3 as in my initial example is the equivalent of 
running the query on
T1  1.2  
T2  User1,10(10,1.2)
T3  User2,11(11,1.2)
...while emitting the result at T5 is the equivalent of running the query over
..
T4  1.3  
T5  User3,9 (9,1.3)
...nevertheless - it is very important to be consistent with what was done 
hence i would like to try to model this query/behavior consequently. This would 
imply in the generic case that you keep the whole T2 as you mention. As this is 
not possible the next thing is to restrict the inner queries  to be time 
bounded (window queries). With this it is more easy to ensure also the 
compliance of the output with the one from a batch. For the beginning we can 
work just like in the case of window aggregates by supporting to emit the 
result only once.

Regarding the retraction support i must admit i need some additional 
clarification. Looking on how retraction is implemented for the aggregators - 
we have an accumulate and retract method that we need to implement. However, 
these apply only to the states in which we keep the accumulators. When we emit 
a result out.collect (new Row(accumulator.getValue()))...this will not be 
retracted. Maybe i am missing something -therefore it would be great if you can 
give a vision on how you see the retraction in this case - i can iterate 
afterwards over the proposal to include this. 

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943164#comment-15943164
 ] 

Fabian Hueske commented on FLINK-6073:
--

Thanks for this proposal. 
I think we discussed this use case on the [mailing list 
before|https://lists.apache.org/thread.html/7ec475aad09f10488d091857abd3c9fbcbc2a4127cd0f88dabb47595@%3Cdev.flink.apache.org%3E].

One of the main goals of Flink's Table API and SQL on streams are unified 
semantics for queries on streams and batch tables.
This is very important in order to run the same query on streams and historic 
data (archived streams).

So far, the APIs comply with this requirement. Streams are logically converted 
into tables by appending events to a conceptual table.
Running a query on such an append table must return the same result regardless 
whether the it is executed in a streaming (the append table is continuously 
updated) or in a batch (the append table is already fixed) fashion.

This is not the case for the semantics that you propose in this JIRA. 

Let's take your example of two streams. If we turn Stream1 and Stream2 into two 
tables T1 and T2 by appending all records we would get:

T1
|| time || exchange ||
| T1 | 1.2 ||
| T4 | 1.3 ||

T2
|| time || user || amount || 
| T2 | User1 | 10 |
| T3 | User2 | 11 |
| T5 | User3 | 9 |

Let's see the results if we execute the proposed queries on these tables in a 
batch fashion:

- Q1 ({{SELECT amount, (SELECT exchange FROM T1) AS field1 FROM T2}}) would 
fail because T1 contains more than a single row.
- Q2 ({{SELECT amount, (SELECT exchange FROM T1 ORDER BY time LIMIT 1) AS 
field1 FROM T2}}) would return the following result 

|| amount || exchange ||
| 10 | 1.3 |
| 11 | 1.3 |
| 9 | 1.3 |

This is different from the result that you want to compute because the result 
of the inner query is 1.3 and there is no time-based predicate between T1 and 
T2.

The batch query that would produce the correct result would look like this 
(given that there are no two records with the same time in T1):

{code}
SELECT amount, exchange 
FROM T1, T2
WHERE T1.time​ ​=​ ​( 
​ ​​ ​SELECT​ ​MAX(t1_2.time) 
​ ​​ ​FROM​ ​T1 ​AS​ t1_2
​ ​​ ​AND​ ​t1_2.time  ​<=​ t2.time)
{code}

Due to the unified stream batch semantics, this should also be the query that 
produces the correct result on a stream.

While I agree, that unified semantics for batch and stream processing is more 
meaningful for event-time processing, I am convinced that this does not mean 
that processing time queries should have different semantics than event-time 
queries.
IMO, the semantics of processing time and event time queries should be as close 
as possible. Also the question of retraction is independent of event-time and 
processing-time. Event-time might need more retractions due to out-of-order or 
late data, but processing time operators need to support retraction as well. 
Consider for example emitting early results for a 1-hour window, computing 
aggregates with out window cause, or filtering with a time-based predicate. 
Similarly, joins as the one above would need to retract the 1.2 exchange rate 
and replace it by 1.3 (in fact, this would require to buffer the complete T2 
input stream, such a query would not be possible to execute).

Best, Fabian

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942298#comment-15942298
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r108062687
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+  calc: LogicalJoin,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputLeft: RelNode,
+  inputRight: RelNode,
+  rowType: RelDataType,
+  description: String)
+extends BiRel(cluster, traitSet, inputLeft, inputRight) with 
DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  calc,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  description + calc.getId())
+  }
+
+  override def toString: String = {
+s"Join(${
+  if 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942266#comment-15942266
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r108059835
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
--- End diff --

need scaladoc to describe the class's responsibility


> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942267#comment-15942267
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r108059875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+  calc: LogicalJoin,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputLeft: RelNode,
+  inputRight: RelNode,
+  rowType: RelDataType,
+  description: String)
+extends BiRel(cluster, traitSet, inputLeft, inputRight) with 
DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  calc,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  description + calc.getId())
+  }
+
+  override def toString: String = {
+s"Join(${
+  if 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-24 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940880#comment-15940880
 ] 

radu commented on FLINK-6073:
-

the join window can be one window element as we emit for every incoming event 
from the main stream (left). For the incoming events from the right stream 
(inner stream) we can cache the last data into a ValueState

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940872#comment-15940872
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3609
  
@fhueske @twalthr @sunjincheng121 @shijinkui @stefanobortoli @hongyuhong

I have made a first implementation draft for supporting inner queries 
mainly when operating on processing time. I would highly appreciate some 
feedback from you to further enhance the approach.

The idea of the implementation is described in 
https://issues.apache.org/jira/browse/FLINK-6073?filter=-2



> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-20 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932636#comment-15932636
 ] 

radu commented on FLINK-6073:
-

I will start the implementation to support this rule
[~fhueske] [~twalthr] [~shijinkui][~stefano.bortoli]

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 streams that operate on processing time.
> SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2
>