[ https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942267#comment-15942267 ]
ASF GitHub Bot commented on FLINK-6073: --------------------------------------- Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3609#discussion_r108059875 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._ +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.api.TableException +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext +import java.lang.Iterable +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue +import org.apache.flink.api.common.functions.RichFlatJoinFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.util.Collector + +class DataStreamJoin( + calc: LogicalJoin, + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputLeft: RelNode, + inputRight: RelNode, + rowType: RelDataType, + description: String) + extends BiRel(cluster, traitSet, inputLeft, inputRight) with DataStreamRel { + + override def deriveRowType(): RelDataType = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamJoin( + calc, + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + description + calc.getId()) + } + + override def toString: String = { + s"Join(${ + if (!calc.getCondition.isAlwaysTrue()) { + s"condition: (${calc.getCondition}), " + } else { + "" + } + }left: ($inputLeft), right($inputRight))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("condition", calc.getCondition, !calc.getCondition.isAlwaysTrue()) + .item("join", calc) + .item("left", inputLeft) + .item("right", inputRight) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + + val inputDSLeft = inputLeft.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDSRight = inputRight.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + + //define the setup for various types of joins to be supported + (calc.getCondition.isAlwaysTrue(), calc.getJoinType) match { + case (true, JoinRelType.LEFT) => + createInnerQueryJoin(inputDSLeft, inputDSRight) + case (_, _) => + throw new TableException("Table does not support this type of JOIN.") + } + + null + } + + def createInnerQueryJoin( + inputDSLeft: DataStream[Row], inputDSRight: DataStream[Row]): DataStream[Row] = { + + // get the output types + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + val result = inputDSLeft.join(inputDSRight) + .where(new EmptyKeySelector()).equalTo(new EmptyKeySelector()) + .window(GlobalWindows.create()) + .trigger(new ProcTimeLeftJoinTrigger()) + .evictor(new FullEvictor()) + .apply(new JoinProcTimeForInnerQuerry(rowTypeInfo)) + + null --- End diff -- why return null > Support for SQL inner queries for proctime > ------------------------------------------ > > Key: FLINK-6073 > URL: https://issues.apache.org/jira/browse/FLINK-6073 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: radu > Assignee: radu > Priority: Critical > Labels: features > Attachments: innerquery.png > > > Time target: Proc Time > **SQL targeted query examples:** > > Q1) `Select item, (select item2 from stream2 ) as itemExtern from stream1;` > Comments: This is the main functionality targeted by this JIRA to enable to > combine in the main query results from an inner query. > Q2) `Select s1.item, (Select a2 from table as t2 where table.id = s1.id > limit 1) from s1;` > Comments: > Another equivalent way to write the first example of inner query is with > limit 1. This ensures the equivalency with the SingleElementAggregation used > when translated the main target syntax for inner query. We must ensure that > the 2 syntaxes are supported and implemented with the same functionality. > There is the option also to select elements in the inner query from a table > not just from a different stream. This should be a sub-JIRA issue implement > this support. > **Description:** > Parsing the SQL inner query via calcite is translated to a join function > (left join with always true condition) between the output of the query on the > main stream and the output of a single output aggregation operation on the > inner query. The translation logic is shown below > ``` > LogicalJoin [condition=true;type=LEFT] > LogicalSingleValue[type=aggregation] > …logic of inner query (LogicalProject, LogicalScan…) > …logical of main,external query (LogicalProject, LogicalScan…)) > ``` > `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special > case operation rather than a proper join to be implemented between > stream-to-stream. The implementation behavior should attach to the main > stream output a value from a different query. > `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder > of the single value that results from the inner query. As this operator is > the guarantee that the inner query will bring to the join no more than one > value, there are several options on how to consider it’s functionality in the > streaming context: > 1. Throw an error if the inner query returns more than one result. This > would be a typical behavior in the case of standard SQL over DB. However, it > is very unlikely that a stream would only emit a single value. Therefore, > such a behavior would be very limited for streams in the inner query. > However, such a behavior might be more useful and common if the inner query > is over a table. > 1. We can interpret the usage of this parameter as the guarantee that at > one moment only one value is selected. Therefore the behavior would rather be > as a filter to select one value. This brings the option that the output of > this operator evolves in time with the second stream that drives the inner > query. The decision on when to evolve the stream should depend on what marks > the evolution of the stream (processing time, watermarks/event time, > ingestion time, window time partitions…). > In this JIRA issue the evolution would be marked by the processing time. For > this implementation the operator would work based on option 2. Hence at every > moment the state of the operator that holds one value can evolve with the > last elements. In this way the logic of the inner query is to select always > the last element (fields, or other query related transformations based on the > last value). This behavior is needed in many scenarios: (e.g., the typical > problem of computing the total income, when incomes are in multiple > currencies and the total needs to be computed in one currency by using always > the last exchange rate). > This behavior is motivated also by the functionality of the 3rd SQL query > example – Q3 (using inner query as the input source for FROM ). In such > scenarios, the selection in the main query would need to be done based on > latest elements. Therefore with such a behavior the 2 types of queries (Q1 > and Q3) would provide the same, intuitive result. > **Functionality example** > Based on the logical translation plan, we exemplify next the behavior of the > inner query applied on 2 streams that operate on processing time. > SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2 > ||Time||Stream1||Stream2||Output|| > |T1| | 1.2| | > |T2|User1,10| | (10,1.2)| > |T3|User2,11| | (11,1.2)| > |T4| | 1.3| | > |T5|User3,9 | | (9,1.3)| > |...| > Note 1. For streams that would operate on event time, at moment T3 we would > need to retract the previous outputs ((10, 1.2), (11,1.2) ) and reemit them > as ((10,1.3), (11,1.3) ). > Note 2. Rather than failing when a new value comes in the inner query we just > update the state that holds the single value. If option 1 for the behavior of > LogicalSingleValue is chosen, than an error should be triggered at moment T3. > **Implementation option** > Considering the notes and the option for the behavior the operator would be > implemented by using the join function of flink with a custom always true > join condition and an inner selection for the output based on the incoming > direction (to mimic the left join). The single value selection can be > implemented over a statefull flat map. In case the join is executed in > parallel by multiple operators, than we either use a parallelism of 1 for the > statefull flatmap (option 1) or we broadcast the outputs of the flatmap to > all join instances to ensure consistency of the results (option 2). > Considering that the flatMap functionality of selecting one value is light, > option 1 is better. The design schema is shown below. > !innerquery.png! > **General logic of Join** > ``` > leftDataStream.join(rightDataStream) > .where(new ConstantConditionSelector()) > .equalTo(new ConstantConditionSelector()) > .window(window.create()) > .trigger(new LeftFireTrigger()) > .evictor(new Evictor()) > .apply(JoinFunction()); > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)