[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942267#comment-15942267
 ] 

ASF GitHub Bot commented on FLINK-6073:
---------------------------------------

Github user shijinkui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3609#discussion_r108059875
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalJoin
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger
    +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
    +import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
    +import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
    +import org.apache.flink.streaming.api.windowing.evictors.Evictor
    +import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
    +import java.lang.Iterable
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
    +import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
    +import org.apache.flink.api.common.functions.RichFlatJoinFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.util.Collector
    +
    +class DataStreamJoin(
    +  calc: LogicalJoin,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputLeft: RelNode,
    +  inputRight: RelNode,
    +  rowType: RelDataType,
    +  description: String)
    +    extends BiRel(cluster, traitSet, inputLeft, inputRight) with 
DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
    +    new DataStreamJoin(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Join(${
    +      if (!calc.getCondition.isAlwaysTrue()) {
    +        s"condition: (${calc.getCondition}), "
    +      } else {
    +        ""
    +      }
    +    }left: ($inputLeft), right($inputRight))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .itemIf("condition", calc.getCondition, 
!calc.getCondition.isAlwaysTrue())
    +      .item("join", calc)
    +      .item("left", inputLeft)
    +      .item("right", inputRight)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
    +
    +    val inputDSLeft = 
inputLeft.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    val inputDSRight = 
inputRight.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    //define the setup for various types of joins to be supported
    +    (calc.getCondition.isAlwaysTrue(), calc.getJoinType) match {
    +      case (true, JoinRelType.LEFT) =>
    +        createInnerQueryJoin(inputDSLeft, inputDSRight)
    +      case (_, _) =>
    +        throw new TableException("Table does not support this type of 
JOIN.")
    +    }
    +
    +    null
    +  }
    +
    +  def createInnerQueryJoin(
    +    inputDSLeft: DataStream[Row], inputDSRight: DataStream[Row]): 
DataStream[Row] = {
    +
    +    // get the output types
    +    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    +
    +    val result = inputDSLeft.join(inputDSRight)
    +      .where(new EmptyKeySelector()).equalTo(new EmptyKeySelector())
    +      .window(GlobalWindows.create())
    +      .trigger(new ProcTimeLeftJoinTrigger())
    +      .evictor(new FullEvictor())
    +      .apply(new JoinProcTimeForInnerQuerry(rowTypeInfo))
    +
    +    null
    --- End diff --
    
    why return null


> Support for SQL inner queries for proctime
> ------------------------------------------
>
>                 Key: FLINK-6073
>                 URL: https://issues.apache.org/jira/browse/FLINK-6073
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: radu
>            Assignee: radu
>            Priority: Critical
>              Labels: features
>         Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>       LogicalSingleValue[type=aggregation]
>               …logic of inner query (LogicalProject, LogicalScan…)
>       …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.    Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.    We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 streams that operate on processing time.
> SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2
>  ||Time||Stream1||Stream2||Output||
> |T1|      |   1.2|             | 
> |T2|User1,10|    |     (10,1.2)|
> |T3|User2,11|            |     (11,1.2)|
> |T4|          |   1.3|             |     
> |T5|User3,9 |    |      (9,1.3)|
> |...|
> Note 1. For streams that would operate on event time, at moment T3 we would 
> need to retract the previous outputs ((10, 1.2), (11,1.2) ) and reemit them 
> as ((10,1.3), (11,1.3) ). 
> Note 2. Rather than failing when a new value comes in the inner query we just 
> update the state that holds the single value. If option 1 for the behavior of 
> LogicalSingleValue is chosen, than an error should be triggered at moment T3.
> **Implementation option**
> Considering the notes and the option for the behavior the operator would be 
> implemented by using the join function of flink  with a custom always true 
> join condition and an inner selection for the output based on the incoming 
> direction (to mimic the left join). The single value selection can be 
> implemented over a statefull flat map. In case the join is executed in 
> parallel by multiple operators, than we either use a parallelism of 1 for the 
> statefull flatmap (option 1) or we broadcast the outputs of the flatmap to 
> all join instances to ensure consistency of the results (option 2). 
> Considering that the flatMap functionality of selecting one value is light, 
> option 1 is better.  The design schema is shown below.
> !innerquery.png!
> **General logic of Join**
> ```
> leftDataStream.join(rightDataStream)
>                  .where(new ConstantConditionSelector())
>                  .equalTo(new ConstantConditionSelector())
>                 .window(window.create())
>                 .trigger(new LeftFireTrigger())
>                 .evictor(new Evictor())
>                .apply(JoinFunction());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to