[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219411672 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ## @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { ValidationSuccess case WindowReference(_, _) => ValidationFailure("Reference to a rowtime or proctime window required.") - case TemporalTableReference(_, _) => -ValidationSuccess case any => ValidationFailure( - s"The '.rowtime' expression can only be used for table definitions, windows, " + -s" and temporal tables, while [${any}] was found.") + s"The '.rowtime' expression can only be used for table definitions, windows " + +s"and temporal table definitions, while [$any] was found.") Review comment: Yes, you are right :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219411313 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, List(left, right)) -// Materialize all of the time attributes from the right side of temporal join -val indicesToMaterialize = - (left.getRowType.getFieldCount until rewrittenTemporalJoin.getRowType.getFieldCount).toSet +val indicesToMaterialize = gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right) materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + private def gatherIndicesToMaterialize( + temporalJoin: Join, + left: RelNode, + right: RelNode) +: Set[Int] = { + +// Materialize all of the time attributes from the right side of temporal join +var indicesToMaterialize = + (left.getRowType.getFieldCount until temporalJoin.getRowType.getFieldCount).toSet + +if (!hasRowtimeAttribute(right.getRowType)) { Review comment: No, I forgot. Adding one now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219410591 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, List(left, right)) -// Materialize all of the time attributes from the right side of temporal join -val indicesToMaterialize = - (left.getRowType.getFieldCount until rewrittenTemporalJoin.getRowType.getFieldCount).toSet +val indicesToMaterialize = gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right) materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + private def gatherIndicesToMaterialize( Review comment: fixed, but imo in visitor pattern is better to have follow this pattern: ``` public visit(X); private doSthWith(X); private doSthElseWith(X); public visit(Y) private doSthWith(Y); private doSthElseWith(Y); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219214189 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -166,9 +167,20 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val right = join.getRight.accept(this) LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType) - } + def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = { +val left = temporalJoin.getLeft.accept(this) +val right = temporalJoin.getRight.accept(this) + +val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, List(left, right)) + +// Materialize all of the time attributes from the right side of temporal join +val indicesToMaterialize = Review comment: As we discussed, it is probably better to materialise it to make it more consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219213980 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -442,11 +402,63 @@ class RexTimeIndicatorMaterializer( // materialize function's result and operands case _ if isTimeIndicatorType(updatedCall.getType) => -updatedCall.clone(timestamp, materializedOperands) +updatedCall.clone(materializerUtils.getTimestamp, materializedOperands) // materialize function's operands only case _ => updatedCall.clone(updatedCall.getType, materializedOperands) } } } + +/** + * Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]]. + */ +class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) { + + private val timestamp = rexBuilder +.getTypeFactory +.asInstanceOf[FlinkTypeFactory] +.createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false) + + def getTimestamp: RelDataType = { +timestamp + } + + def projectAndMaterializeFields(input: RelNode, indicesToMaterialize: Set[Int]) : RelNode = { +val projects = input.getRowType.getFieldList.map { field => + materializeIfContains( +new RexInputRef(field.getIndex, field.getType), +field.getIndex, +indicesToMaterialize) +} + +LogicalProject.create( Review comment: I know, but I think there is no need to complicate the logic here (and in other similar places), when we can relay on other simply optimisation rules like merging projections. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219212679 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -158,36 +158,60 @@ class Table( /** * Creates [[TemporalTableFunction]] backed up by this table as a history table. -* -* @param timeIndicator field for the [[TemporalTableFunction]]. Must points to a time indicator -* @param primaryKeyfield for the [[TemporalTableFunction]]. -* @return [[TemporalTableFunction]] +* Temporal Tables represent a concept of a table that changes over time and for which +* Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access Review comment: I would keep `access` because we have in the backlog story that would enable to write following query: ``` SELECT * FROM Rates('2017/06/14 14:23:12`); ``` Otherwise it would almost for sure drift out of sync :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r219212679 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -158,36 +158,60 @@ class Table( /** * Creates [[TemporalTableFunction]] backed up by this table as a history table. -* -* @param timeIndicator field for the [[TemporalTableFunction]]. Must points to a time indicator -* @param primaryKeyfield for the [[TemporalTableFunction]]. -* @return [[TemporalTableFunction]] +* Temporal Tables represent a concept of a table that changes over time and for which +* Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access Review comment: I would keep `access` because we have in the backlog story that would enable to write following query: ``` SELECT * FROM Rates('2017/06/14 14:23:12`); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218414712 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -803,70 +803,37 @@ abstract class StreamTableEnvironment( * @return The optimized [[RelNode]] tree */ private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { - -// 0. convert sub-queries before query decorrelation -val convSubQueryPlan = runHepPlanner( - HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) - -// 0. convert table references -val fullRelNode = runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TABLE_REF_RULES, - convSubQueryPlan, - relNode.getTraitSet) - -// 1. decorrelate +val convSubQueryPlan = optimizeConvertSubQueries(relNode) +val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) +val fullRelNode = optimizeConvertTableReferences(temporalTableJoinPlan) val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode) - -// 2. convert time indicators val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) Review comment: Any suggestions maybe? I renamed it to `planWithMaterializedTimeAttributes` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218422026 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util.Collections + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core._ +import org.apache.calcite.rex.{RexBuilder, RexNode} +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Represents a join between a table and [[org.apache.flink.table.functions.TemporalTableFunction]] + * + * @param cluster + * @param traitSet + * @param left stream + * @param right table scan of underlying + * [[org.apache.flink.table.functions.TemporalTableFunction]] + * @param condition must contain [[LogicalTemporalTableJoin#TEMPORAL_JOIN_CONDITION]] with + * correctly defined references to rightTimeAttribute, + * rightPrimaryKeyExpression and leftTimeAttribute. We can not implement + * those references as separate fields, because of problems with Calcite's + * optimization rules like projections push downs, column + * pruning/renaming/reordering, etc. Later rightTimeAttribute, + * rightPrimaryKeyExpression and leftTimeAttribute will be extracted from + * the condition. + */ +class LogicalTemporalTableJoin private( +cluster: RelOptCluster, +traitSet: RelTraitSet, +left: RelNode, +right: RelNode, +condition: RexNode) + extends Join( +cluster, +traitSet, +left, +right, +condition, +Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]], +JoinRelType.INNER) { + + override def copy( + traitSet: RelTraitSet, + condition: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): LogicalTemporalTableJoin = { +checkArgument(joinType == this.getJoinType, + "Can not change join type".asInstanceOf[Object]) +checkArgument(semiJoinDone == this.isSemiJoinDone, + "Can not change semiJoinDone".asInstanceOf[Object]) +new LogicalTemporalTableJoin( + cluster, + traitSet, + left, + right, + condition) + } +} + +object LogicalTemporalTableJoin { + /** +* See [[LogicalTemporalTableJoin#condition]] +*/ + val TEMPORAL_JOIN_CONDITION = new SqlFunction( +"__TEMPORAL_JOIN_CONDITION", +SqlKind.OTHER_FUNCTION, +ReturnTypes.BOOLEAN_NOT_NULL, +null, +OperandTypes.or( + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.DATETIME, +OperandTypes.ANY), + OperandTypes.sequence( +"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'", +OperandTypes.DATETIME, +OperandTypes.ANY)), +SqlFunctionCategory.SYSTEM) + + def makeRowTimeTemporalJoinConditionCall( + rexBuilder: RexBuilder, + leftTimeAttribute: RexNode, + rightTimeAttribute: RexNode, + rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightTimeAttribute, + rightPrimaryKeyExpression) + } + + def makeProcTimeTemporalJoinConditionCall( + rexBuilder: RexBuilder, + leftTimeAttribute: RexNode, + rightPrimaryKeyExpression: RexNode): RexNode = { +rexBuilder.makeCall( + TEMPORAL_JOIN_CONDITION, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + + /** +* See [[LogicalTemporalTableJoin]] +*/ + def create( + rexBuilder: RexBuilder, + cluster: RelOptCluster, + traitSet: RelTraitSet,
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218439267 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.flink.table.api.{Table, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin +import org.apache.flink.table.plan.util.RexDefaultVisitor + +class LogicalCorrelateToTemporalTableJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToTemporalTableJoinRule") { + + def extractNameFromTimeAttribute(timeAttribute: Expression): String = { +timeAttribute match { + case RowtimeAttribute(expr: TemporalTableReference) => +expr.name + case ProctimeAttribute(expr: TemporalTableReference) => +expr.name + case TemporalTableReference(name, _) => +name + case ResolvedFieldReference(name, _) +if timeAttribute.resultType == Types.LONG || + timeAttribute.resultType == Types.SQL_TIMESTAMP => +name + case _ => throw new ValidationException( +s"Invalid timeAttribute [${timeAttribute}] in TemporalTableFunction") +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(TemporalTableFunctionCall(rightTemporalTableFunction, leftTimeAttribute)) => +val underlyingHistoryTable: Table = rightTemporalTableFunction.getUnderlyingHistoryTable +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingHistoryTable.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingHistoryTable.logicalPlan.toRelNode(relBuilder) + +val rightTimeIndicatorExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + extractNameFromTimeAttribute(rightTemporalTableFunction.getTimeAttribute)) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightTemporalTableFunction.getPrimaryKey) + +relBuilder.push( + if (isProctimeIndicatorType(rightTemporalTableFunction.getTimeAttribute.resultType)) { +LogicalTemporalTableJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + else { +LogicalTemporalTableJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + leftTimeAttribute, + rightTimeIndicatorExpression, + rightPrimaryKeyExpression) + }) +call.transf
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218442168 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.flink.table.api.{Table, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin +import org.apache.flink.table.plan.util.RexDefaultVisitor + +class LogicalCorrelateToTemporalTableJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToTemporalTableJoinRule") { + + def extractNameFromTimeAttribute(timeAttribute: Expression): String = { +timeAttribute match { + case RowtimeAttribute(expr: TemporalTableReference) => +expr.name + case ProctimeAttribute(expr: TemporalTableReference) => +expr.name + case TemporalTableReference(name, _) => +name + case ResolvedFieldReference(name, _) +if timeAttribute.resultType == Types.LONG || + timeAttribute.resultType == Types.SQL_TIMESTAMP => +name + case _ => throw new ValidationException( +s"Invalid timeAttribute [${timeAttribute}] in TemporalTableFunction") +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(TemporalTableFunctionCall(rightTemporalTableFunction, leftTimeAttribute)) => +val underlyingHistoryTable: Table = rightTemporalTableFunction.getUnderlyingHistoryTable +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingHistoryTable.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingHistoryTable.logicalPlan.toRelNode(relBuilder) + +val rightTimeIndicatorExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + extractNameFromTimeAttribute(rightTemporalTableFunction.getTimeAttribute)) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightTemporalTableFunction.getPrimaryKey) + +relBuilder.push( + if (isProctimeIndicatorType(rightTemporalTableFunction.getTimeAttribute.resultType)) { +LogicalTemporalTableJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + leftTimeAttribute, + rightPrimaryKeyExpression) + } + else { +LogicalTemporalTableJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + leftTimeAttribute, + rightTimeIndicatorExpression, + rightPrimaryKeyExpression) + }) +call.transf
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218414712 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -803,70 +803,37 @@ abstract class StreamTableEnvironment( * @return The optimized [[RelNode]] tree */ private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { - -// 0. convert sub-queries before query decorrelation -val convSubQueryPlan = runHepPlanner( - HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) - -// 0. convert table references -val fullRelNode = runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TABLE_REF_RULES, - convSubQueryPlan, - relNode.getTraitSet) - -// 1. decorrelate +val convSubQueryPlan = optimizeConvertSubQueries(relNode) +val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) +val fullRelNode = optimizeConvertTableReferences(temporalTableJoinPlan) val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode) - -// 2. convert time indicators val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) Review comment: Any suggestions maybe? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218486273 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.flink.table.api.{Table, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin +import org.apache.flink.table.plan.util.RexDefaultVisitor + +class LogicalCorrelateToTemporalTableJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToTemporalTableJoinRule") { + + def extractNameFromTimeAttribute(timeAttribute: Expression): String = { +timeAttribute match { + case RowtimeAttribute(expr: TemporalTableReference) => Review comment: Dropped This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218462800 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rex._ +import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory._ +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._ +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.runtime.join.TemporalJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions.checkState + +class DataStreamTemporalJoinToCoProcessTranslator( +textualRepresentation: String, +config: TableConfig, +returnType: TypeInformation[Row], +leftSchema: RowSchema, +rightSchema: RowSchema, +joinInfo: JoinInfo, +rexBuilder: RexBuilder) + extends DataStreamJoinToCoProcessTranslator( +config, +returnType, +leftSchema, +rightSchema, +joinInfo, +rexBuilder) { + + var leftTimeAttribute: Option[RexNode] = None + + var rightTimeAttribute: Option[RexNode] = None + + var rightPrimaryKeyExpression: Option[RexNode] = None + + override val nonEquiJoinPredicates: Option[RexNode] = extractTemporalJoinCondition() + + def extractTemporalJoinCondition(): Option[RexNode] = { +checkState( + !joinInfo.isEqui, + "Missing %s in join condition", + TEMPORAL_JOIN_CONDITION) + +val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder) +val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor( + nonEquiJoinRex.toString) + +Some(temporalJoinConditionExtractor.apply(nonEquiJoinRex)) + } + + override protected def createCoProcessFunction( + joinType: JoinRelType, + queryConfig: StreamQueryConfig, + joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row]) +: CoProcessFunction[CRow, CRow, CRow] = { + +checkState( + leftTimeAttribute.isDefined && +rightPrimaryKeyExpression.isDefined, + "Missing %s in join condition", + TEMPORAL_JOIN_CONDITION) + +if (rightTimeAttribute.isDefined) { Review comment: If we add an additional flag, we just duplicate the same logic, since we would still need to handle this `Optional` field: ``` if (isRowtimeMode) { throw new ValidationException( s"Currently only proctime temporal joins are supported in [${textualRepresentation}]") } checkState(!rightTimeAttribute.isDefined); ``` Alternative solution (IMO not worth the effort here) would be to split this class and have one for processing time and one for rowtime. Then we could skip this check altogether. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218414391 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ## @@ -448,48 +448,13 @@ abstract class BatchTableEnvironment( * @return The optimized [[RelNode]] tree */ private[flink] def optimize(relNode: RelNode): RelNode = { - -// 0. convert sub-queries before query decorrelation -val convSubQueryPlan = runHepPlanner( - HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) - -// 0. convert table references -val fullRelNode = runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TABLE_REF_RULES, - convSubQueryPlan, - relNode.getTraitSet) - -// 1. decorrelate +val convSubQueryPlan = optimizeConvertSubQueries(relNode) Review comment: Isn't the general solution to define `TableEnvironment` interface in java and only expanding it in Scala? Moving those methods to static utility class doesn't seems as a proper solution. I think it would deserve a separate ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218478087 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rex._ +import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory._ +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._ +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.runtime.join.TemporalJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions.checkState + +class DataStreamTemporalJoinToCoProcessTranslator( +textualRepresentation: String, +config: TableConfig, +returnType: TypeInformation[Row], +leftSchema: RowSchema, +rightSchema: RowSchema, +joinInfo: JoinInfo, +rexBuilder: RexBuilder) + extends DataStreamJoinToCoProcessTranslator( +config, +returnType, +leftSchema, +rightSchema, +joinInfo, +rexBuilder) { + + var leftTimeAttribute: Option[RexNode] = None + + var rightTimeAttribute: Option[RexNode] = None + + var rightPrimaryKeyExpression: Option[RexNode] = None + + override val nonEquiJoinPredicates: Option[RexNode] = extractTemporalJoinCondition() + + def extractTemporalJoinCondition(): Option[RexNode] = { +checkState( + !joinInfo.isEqui, + "Missing %s in join condition", + TEMPORAL_JOIN_CONDITION) + +val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder) +val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor( + nonEquiJoinRex.toString) + +Some(temporalJoinConditionExtractor.apply(nonEquiJoinRex)) + } + + override protected def createCoProcessFunction( + joinType: JoinRelType, + queryConfig: StreamQueryConfig, + joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row]) +: CoProcessFunction[CRow, CRow, CRow] = { + +checkState( + leftTimeAttribute.isDefined && +rightPrimaryKeyExpression.isDefined, + "Missing %s in join condition", + TEMPORAL_JOIN_CONDITION) + +if (rightTimeAttribute.isDefined) { + throw new ValidationException( +s"Currently only proctime temporal joins are supported in [${textualRepresentation}]") +} + +joinType match { + case JoinRelType.INNER => +new TemporalJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunction.name, + joinFunction.code, + queryConfig) + case _ => + throw new ValidationException( + s"Only ${JoinRelType.INNER} temporal join is supported in [${textualRepresentation}]") +} + } + + private class TemporalJoinConditionExtractor( + nonEquiJoinCondition: String) +extends RexShuttle { + +override def visitCall(call: RexCall): RexNode = { + if (call.getOperator != TEMPORAL_JOIN_CONDITION) { +return super.visitCall(call) + } + + if (leftTimeAttribute.isDefined +|| rightPrimaryKeyExpression.isDefined +|| rightTimeAttribute.isDefined) { +throw new ValidationException( + s"Multiple ${TEMPORAL_JOIN_CONDITION} functions in [${textualRepresentation}]") + } + if (call.getOperands.size() == 3) { Review comment: I have extracted this logic to `LogicalTemporalTableJoin.isRowtime
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218423118 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTemporalTableJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil + +class DataStreamTemporalTableJoinRule + extends ConverterRule( +classOf[FlinkLogicalTemporalTableJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamTemporalTableJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalTemporalTableJoin = call.rel(0) +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( Review comment: ok :( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218374441 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ## @@ -678,6 +678,32 @@ case class WindowAggregate( } } +case class TemporalTable( Review comment: `Constructor` is inconsistent with other case classes here. This is behaviour is already quite clearly visible in this class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218420897 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -400,6 +411,10 @@ class RexTimeIndicatorMaterializer( // materialize operands with time indicators val materializedOperands = updatedCall.getOperator match { + case tableFunction: TableSqlFunction Review comment: Good catch. This must have turned into a dead code. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218479232 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils._ +import org.junit.Test + +class TemporalTableJoinValidationTest extends TableTestBase { + + val util: TableTestUtil = streamTestUtil() + + val orders = util.addTable[(Long, String, Timestamp)]( +"Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime) + + val ordersProctime = util.addTable[(Long, String)]( +"OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime) + + val ordersWithoutTimeAttribute = util.addTable[(Long, String, Timestamp)]( +"OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime) + + val ratesHistory = util.addTable[(String, Int, Timestamp)]( +"RatesHistory", 'currency, 'rate, 'rowtime.rowtime) + + val ratesHistoryWithoutTimeAttribute = util.addTable[(String, Int, Timestamp)]( +"ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime) + + @Test + def testInvalidFieldReference(): Unit = { +expectedException.expect(classOf[ValidationException]) +expectedException.expectMessage("Cannot resolve field [foobar]") + +ratesHistory.createTemporalTableFunction('rowtime.rowtime, 'foobar) + } + + @Test + def testInvalidStringFieldReference(): Unit = { +expectedException.expect(classOf[ValidationException]) +expectedException.expectMessage("Cannot resolve field [foobar]") + +ratesHistory.createTemporalTableFunction("rowtime", "foobar") + } + + @Test + def testNonTimeIndicatorOnRightSide(): Unit = { +expectedException.expect(classOf[ValidationException]) +expectedException.expectMessage( + "Non rowtime timeAttribute [TIMESTAMP(3)] used to create TemporalTableFunction") + +val rates = ratesHistoryWithoutTimeAttribute.createTemporalTableFunction('rowtime, 'currency) Review comment: But the previous version of this PR was doing it earlier on the API level. How is it handled by windowed aggregations/windowed joins? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218460055 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rex._ +import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, TableException, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory._ +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} +import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._ +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.runtime.join.TemporalJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions.checkState + +class DataStreamTemporalJoinToCoProcessTranslator( +textualRepresentation: String, +config: TableConfig, +returnType: TypeInformation[Row], +leftSchema: RowSchema, +rightSchema: RowSchema, +joinInfo: JoinInfo, +rexBuilder: RexBuilder) + extends DataStreamJoinToCoProcessTranslator( +config, +returnType, +leftSchema, +rightSchema, +joinInfo, +rexBuilder) { + + var leftTimeAttribute: Option[RexNode] = None + + var rightTimeAttribute: Option[RexNode] = None + + var rightPrimaryKeyExpression: Option[RexNode] = None + + override val nonEquiJoinPredicates: Option[RexNode] = extractTemporalJoinCondition() + + def extractTemporalJoinCondition(): Option[RexNode] = { +checkState( + !joinInfo.isEqui, + "Missing %s in join condition", + TEMPORAL_JOIN_CONDITION) + +val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder) +val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor( Review comment: I have refactored this class and moved out all of the construction code. It makes it nicer (for example `leftTimeAttribute`, `rightTimeAttribute` and `rightPrimaryKeyExpression` are now proper final fields This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218364079 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ## @@ -139,6 +139,23 @@ case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) override def toString: String = s"'$name" } +case class TemporalTableReference(name: String, typeInformation: TypeInformation[_]) Review comment: If I remember correctly, I introduced it in order to implement `RowtimeAttribute#validateInput` and `RowtimeAttribute#resultType`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218369289 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.sql.Timestamp + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions.Expression +import org.apache.flink.types.Row + +/** + * Class representing temporal table function over some history table. + */ +class TemporalTableFunction private( +@transient private val underlyingHistoryTable: Table, +private val timeAttribute: Expression, +private val primaryKey: String, +private val resultType: RowTypeInfo) + extends TableFunction[Row] { + + def eval(row: Timestamp): Unit = { Review comment: I don't think that's a good idea. That would brake the abstraction, and would require to adjust/refactor more code, while this is the special case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r218417820 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -166,9 +172,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val right = join.getRight.accept(this) LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType) - } + def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = { +val left = temporalJoin.getLeft.accept(this) +val right = temporalJoin.getRight.accept(this) + +temporalJoin.copy(temporalJoin.getTraitSet, List(left, right)) Review comment: Why? The implementation looks like it's working and logically the output has still a valid processing time attribute. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212625803 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class VersionedJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + protected var rightState: ValueState[Row] = _ + protected var cRowWrapper: CRowWrappingCollector = _ + + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) + +joinFunction = clazz.newInstance() + +val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType) Review comment: Could we do it as a follow up task to minimize PR size? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833159 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} + +ob
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212257550 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.Table +import org.apache.flink.types.Row + +/** + * Class represnting table version functions over some history table. + */ +class TableVersionFunction private( +@transient private val _table: Table, +private[flink] val versionField: String, +private[flink] val primaryKey: String, +private[flink] val resultType: RowTypeInfo, +private[flink] val isProctime: Boolean) + extends TableFunction[Row] { + + def eval(row: Timestamp): Unit = { Review comment: Without it I'm getting: ``` org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.functions.TemporalTableFunction' does not implement at least one method named 'eval' which is public, not abstract and (in case of table functions) not static. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208835295 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala ## @@ -103,27 +97,42 @@ class DataStreamJoin( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { -val config = tableEnv.getConfig -val returnType = schema.typeInfo -val keyPairs = joinInfo.pairs().toList +validateKeyTypes() -// get the equality keys -val leftKeys = ArrayBuffer.empty[Int] -val rightKeys = ArrayBuffer.empty[Int] +val leftDataStream = Review comment: I would be against that. I'm depending on this change in this PR, so it defines strict order of those two commits. Either I wouldn't publish the actual versioned joins PR after this commit is merged, or the versioned joins PR would look exactly the same as it looks now. Splitting into two PRs adds a lot of overhead (in case of rebases or applying review changes you need to keep updating multiple PRs) and room for a lot of problems with PRs going out of sync. During reviewing it's also annoying, because you see the same code twice and reviewers (same or different ones) are often (almost always?) making half of the comments in one PR and half in the other. Besides, this is already in a separate commit so it can be reviewed separately. Maybe instead of reviewing PR all at once try reviewing it commit by commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208835846 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ## @@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.setProcessingTime(1) testHarness.processElement1(new StreamRecord( - CRow(Row.of(1L: JLong, "1a1"), change = true), 1)) + CRow(1L: JLong, "1a1"), 1)) Review comment: Again, I'm depending on this change in the following commit. What harm does it do here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208835295 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala ## @@ -103,27 +97,42 @@ class DataStreamJoin( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { -val config = tableEnv.getConfig -val returnType = schema.typeInfo -val keyPairs = joinInfo.pairs().toList +validateKeyTypes() -// get the equality keys -val leftKeys = ArrayBuffer.empty[Int] -val rightKeys = ArrayBuffer.empty[Int] +val leftDataStream = Review comment: I would be against that. I'm depending on this change in this PR, so it defines strict order of those two commits. Either I wouldn't publish the actual versioned joins PR after this commit is merged, or the versioned joins PR would look exactly the same as it looks now. Splitting into two PRs adds a lot of overhead and room for a lot of problems with PRs going out of sync. During reviewing it's also annoying, because you see the same code twice and reviewers (same or different ones) are often (almost always?) making half of the comments in one PR and half in the other. Besides, this is already in a separate commit so it can be reviewed separately. Maybe instead of reviewing PR all at once try reviewing it commit by commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208829502 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -155,6 +158,90 @@ class Table( select(withResolvedAggFunctionCall: _*) } + /** +* Creates table version function backed up by this table as a history table. +* +* @param version field for the [[TableVersionFunction]]. Must points to a time indicator +* @param primaryKey field for the [[TableVersionFunction]]. +* @return [[TableVersionFunction]] +*/ + def createTableVersionFunction( + version: String, + primaryKey: String): TableVersionFunction = { Review comment: In first version me and @fhueske didn't want to focus on this and we agreed to do this as a follow up task. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208830206 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala ## @@ -46,6 +46,8 @@ class FlinkRelBuilder( relOptCluster, relOptSchema) { + def getRelOptSchema: RelOptSchema = relOptSchema Review comment: Yes, check for `getRelOptSchema` usage in this commit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208831842 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalVersionedJoin.scala ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util.Collections + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core._ +import org.apache.calcite.rex.{RexBuilder, RexNode} +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Represents a join between a table and [[org.apache.flink.table.functions.TableVersionFunction]] + * + * @param cluster + * @param traitSet + * @param left stream + * @param right table scan of underlying [[org.apache.flink.table.functions.TableVersionFunction]] + * @param condition must contain [[LogicalVersionedJoin#VERSIONING_JOIN_CONDITION()]] with + * correctly defined references to rightVersioneExpression, + * rightPrimaryKeyExpression and leftVersionExpression. We can not implement + * those references as separate fields, because of problems with Calcite's + * optimization rules like projections push downs, column + * pruning/renaming/reordering, etc. Later rightVersioneExpression, + * rightPrimaryKeyExpression and leftVersionExpression will be extracted from + * the condition. + */ +class LogicalVersionedJoin private ( +cluster: RelOptCluster, +traitSet: RelTraitSet, +left: RelNode, +right: RelNode, +condition: RexNode) + extends Join( +cluster, +traitSet, +left, +right, +condition, +Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]], +JoinRelType.INNER) { + + override def copy( + traitSet: RelTraitSet, + condition: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): LogicalVersionedJoin = { +checkArgument(joinType == this.getJoinType, + "Can not change join type".asInstanceOf[Object]) +checkArgument(semiJoinDone == this.isSemiJoinDone, + "Can not change semiJoinDone".asInstanceOf[Object]) +new LogicalVersionedJoin( + cluster, + traitSet, + left, + right, + condition) + } +} + +object LogicalVersionedJoin { + /** +* See [[LogicalVersionedJoin#condition]] +*/ + val VERSIONING_JOIN_CONDITION = new SqlFunction( Review comment: We want and need to use Calcite's optimisations for joins (like pushing down predicate, column pruning, predicate pushdown etc). To enable that, `LogicalVersionedJoin` extends calcite's `Join`. However there is no way to expand `Join`'s semantic, that beside actual join condition we need some extra fields to handle versioning. The only way to do this, is to expose `leftVersionExpression`, `rightVersionExpression` and `rightPrimaryKeyExpression` via this `VERSIONING_JOIN_CONDITION`. Otherwise, if we kept those expressions on some hidden state, they could be pruned/renamed/reordered/... . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833159 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} + +ob
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833101 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} + +ob