[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411672
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
 ValidationSuccess
   case WindowReference(_, _) =>
 ValidationFailure("Reference to a rowtime or proctime window 
required.")
-  case TemporalTableReference(_, _) =>
-ValidationSuccess
   case any =>
 ValidationFailure(
-  s"The '.rowtime' expression can only be used for table definitions, 
windows, " +
-s" and temporal tables, while [${any}] was found.")
+  s"The '.rowtime' expression can only be used for table definitions, 
windows " +
+s"and temporal table definitions, while [$any] was found.")
 
 Review comment:
   Yes, you are right :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411313
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
+  temporalJoin: Join,
+  left: RelNode,
+  right: RelNode)
+: Set[Int] = {
+
+// Materialize all of the time attributes from the right side of temporal 
join
+var indicesToMaterialize =
+  (left.getRowType.getFieldCount until 
temporalJoin.getRowType.getFieldCount).toSet
+
+if (!hasRowtimeAttribute(right.getRowType)) {
 
 Review comment:
   No, I forgot. Adding one now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219410591
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
 
 Review comment:
   fixed, but imo in visitor pattern is better to have follow this pattern:
   ```
   public visit(X);
   
   private doSthWith(X);
   
   private doSthElseWith(X);
   
   public visit(Y)
   
   private doSthWith(Y);
   
   private doSthElseWith(Y);
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-20 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219214189
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -166,9 +167,20 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 val right = join.getRight.accept(this)
 
 LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
-
   }
 
+  def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
+val left = temporalJoin.getLeft.accept(this)
+val right = temporalJoin.getRight.accept(this)
+
+val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
+
+// Materialize all of the time attributes from the right side of temporal 
join
+val indicesToMaterialize =
 
 Review comment:
   As we discussed, it is probably better to materialise it to make it more 
consistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-20 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219213980
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -442,11 +402,63 @@ class RexTimeIndicatorMaterializer(
 
   // materialize function's result and operands
   case _ if isTimeIndicatorType(updatedCall.getType) =>
-updatedCall.clone(timestamp, materializedOperands)
+updatedCall.clone(materializerUtils.getTimestamp, materializedOperands)
 
   // materialize function's operands only
   case _ =>
 updatedCall.clone(updatedCall.getType, materializedOperands)
 }
   }
 }
+
+/**
+  * Helper class for shared logic of materializing time attributes in 
[[RelNode]] and [[RexNode]].
+  */
+class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {
+
+  private val timestamp = rexBuilder
+.getTypeFactory
+.asInstanceOf[FlinkTypeFactory]
+.createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
+
+  def getTimestamp: RelDataType = {
+timestamp
+  }
+
+  def projectAndMaterializeFields(input: RelNode, indicesToMaterialize: 
Set[Int]) : RelNode = {
+val projects = input.getRowType.getFieldList.map { field =>
+  materializeIfContains(
+new RexInputRef(field.getIndex, field.getType),
+field.getIndex,
+indicesToMaterialize)
+}
+
+LogicalProject.create(
 
 Review comment:
   I know, but I think there is no need to complicate the logic here (and in 
other similar places), when we can relay on other simply optimisation rules 
like merging projections.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-20 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219212679
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -158,36 +158,60 @@ class Table(
 
   /**
 * Creates [[TemporalTableFunction]] backed up by this table as a history 
table.
-*
-* @param timeIndicator field for the [[TemporalTableFunction]]. Must 
points to a time indicator
-* @param primaryKeyfield for the [[TemporalTableFunction]].
-* @return [[TemporalTableFunction]]
+* Temporal Tables represent a concept of a table that changes over time 
and for which
+* Flink keeps track of those changes. [[TemporalTableFunction]] provides a 
way how to access
 
 Review comment:
   I would keep `access` because we have in the backlog story that would enable 
to write following query:
   ```
   SELECT * FROM Rates('2017/06/14 14:23:12`);
   ```
   Otherwise it would almost for sure drift out of sync :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-20 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219212679
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -158,36 +158,60 @@ class Table(
 
   /**
 * Creates [[TemporalTableFunction]] backed up by this table as a history 
table.
-*
-* @param timeIndicator field for the [[TemporalTableFunction]]. Must 
points to a time indicator
-* @param primaryKeyfield for the [[TemporalTableFunction]].
-* @return [[TemporalTableFunction]]
+* Temporal Tables represent a concept of a table that changes over time 
and for which
+* Flink keeps track of those changes. [[TemporalTableFunction]] provides a 
way how to access
 
 Review comment:
   I would keep `access` because we have in the backlog story that would enable 
to write following query:
   ```
   SELECT * FROM Rates('2017/06/14 14:23:12`);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218414712
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -803,70 +803,37 @@ abstract class StreamTableEnvironment(
 * @return The optimized [[RelNode]] tree
 */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): 
RelNode = {
-
-// 0. convert sub-queries before query decorrelation
-val convSubQueryPlan = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, 
relNode.getTraitSet)
-
-// 0. convert table references
-val fullRelNode = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP,
-  FlinkRuleSets.TABLE_REF_RULES,
-  convSubQueryPlan,
-  relNode.getTraitSet)
-
-// 1. decorrelate
+val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
+val fullRelNode = optimizeConvertTableReferences(temporalTableJoinPlan)
 val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
-// 2. convert time indicators
 val convPlan = RelTimeIndicatorConverter.convert(decorPlan, 
getRelBuilder.getRexBuilder)
 
 Review comment:
   Any suggestions maybe? I renamed it to `planWithMaterializedTimeAttributes`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218422026
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalTemporalTableJoin.scala
 ##
 @@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util.Collections
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Represents a join between a table and 
[[org.apache.flink.table.functions.TemporalTableFunction]]
+  *
+  * @param cluster
+  * @param traitSet
+  * @param left  stream
+  * @param right table scan of underlying
+  *  [[org.apache.flink.table.functions.TemporalTableFunction]]
+  * @param condition must contain 
[[LogicalTemporalTableJoin#TEMPORAL_JOIN_CONDITION]] with
+  *  correctly defined references to rightTimeAttribute,
+  *  rightPrimaryKeyExpression and leftTimeAttribute. We can 
not implement
+  *  those references as separate fields, because of problems 
with Calcite's
+  *  optimization rules like projections push downs, column
+  *  pruning/renaming/reordering, etc. Later 
rightTimeAttribute,
+  *  rightPrimaryKeyExpression and leftTimeAttribute will be 
extracted from
+  *  the condition.
+  */
+class LogicalTemporalTableJoin private(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode)
+  extends Join(
+cluster,
+traitSet,
+left,
+right,
+condition,
+Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]],
+JoinRelType.INNER) {
+
+  override def copy(
+   traitSet: RelTraitSet,
+   condition: RexNode,
+   left: RelNode,
+   right: RelNode,
+   joinType: JoinRelType,
+   semiJoinDone: Boolean): LogicalTemporalTableJoin = {
+checkArgument(joinType == this.getJoinType,
+  "Can not change join type".asInstanceOf[Object])
+checkArgument(semiJoinDone == this.isSemiJoinDone,
+  "Can not change semiJoinDone".asInstanceOf[Object])
+new LogicalTemporalTableJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  condition)
+  }
+}
+
+object LogicalTemporalTableJoin {
+  /**
+* See [[LogicalTemporalTableJoin#condition]]
+*/
+  val TEMPORAL_JOIN_CONDITION = new SqlFunction(
+"__TEMPORAL_JOIN_CONDITION",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.BOOLEAN_NOT_NULL,
+null,
+OperandTypes.or(
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, RIGHT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.DATETIME,
+OperandTypes.ANY),
+  OperandTypes.sequence(
+"'(LEFT_TIME_ATTRIBUTE, PRIMARY_KEY)'",
+OperandTypes.DATETIME,
+OperandTypes.ANY)),
+SqlFunctionCategory.SYSTEM)
+
+  def makeRowTimeTemporalJoinConditionCall(
+  rexBuilder: RexBuilder,
+  leftTimeAttribute: RexNode,
+  rightTimeAttribute: RexNode,
+  rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  def makeProcTimeTemporalJoinConditionCall(
+  rexBuilder: RexBuilder,
+  leftTimeAttribute: RexNode,
+  rightPrimaryKeyExpression: RexNode): RexNode = {
+rexBuilder.makeCall(
+  TEMPORAL_JOIN_CONDITION,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+
+  /**
+* See [[LogicalTemporalTableJoin]]
+*/
+  def create(
+  rexBuilder: RexBuilder,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218439267
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
 ##
 @@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.flink.table.api.{Table, Types, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+
+class LogicalCorrelateToTemporalTableJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToTemporalTableJoinRule") {
+
+  def extractNameFromTimeAttribute(timeAttribute: Expression): String = {
+timeAttribute match {
+  case RowtimeAttribute(expr: TemporalTableReference) =>
+expr.name
+  case ProctimeAttribute(expr: TemporalTableReference) =>
+expr.name
+  case TemporalTableReference(name, _) =>
+name
+  case ResolvedFieldReference(name, _)
+if timeAttribute.resultType == Types.LONG ||
+  timeAttribute.resultType == Types.SQL_TIMESTAMP =>
+name
+  case _ => throw new ValidationException(
+s"Invalid timeAttribute [${timeAttribute}] in TemporalTableFunction")
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(TemporalTableFunctionCall(rightTemporalTableFunction, 
leftTimeAttribute)) =>
+val underlyingHistoryTable: Table = 
rightTemporalTableFunction.getUnderlyingHistoryTable
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingHistoryTable.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingHistoryTable.logicalPlan.toRelNode(relBuilder)
+
+val rightTimeIndicatorExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  
extractNameFromTimeAttribute(rightTemporalTableFunction.getTimeAttribute))
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightTemporalTableFunction.getPrimaryKey)
+
+relBuilder.push(
+  if 
(isProctimeIndicatorType(rightTemporalTableFunction.getTimeAttribute.resultType))
 {
+LogicalTemporalTableJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalTemporalTableJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  leftTimeAttribute,
+  rightTimeIndicatorExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transf

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218442168
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
 ##
 @@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.flink.table.api.{Table, Types, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+
+class LogicalCorrelateToTemporalTableJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToTemporalTableJoinRule") {
+
+  def extractNameFromTimeAttribute(timeAttribute: Expression): String = {
+timeAttribute match {
+  case RowtimeAttribute(expr: TemporalTableReference) =>
+expr.name
+  case ProctimeAttribute(expr: TemporalTableReference) =>
+expr.name
+  case TemporalTableReference(name, _) =>
+name
+  case ResolvedFieldReference(name, _)
+if timeAttribute.resultType == Types.LONG ||
+  timeAttribute.resultType == Types.SQL_TIMESTAMP =>
+name
+  case _ => throw new ValidationException(
+s"Invalid timeAttribute [${timeAttribute}] in TemporalTableFunction")
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(TemporalTableFunctionCall(rightTemporalTableFunction, 
leftTimeAttribute)) =>
+val underlyingHistoryTable: Table = 
rightTemporalTableFunction.getUnderlyingHistoryTable
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingHistoryTable.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingHistoryTable.logicalPlan.toRelNode(relBuilder)
+
+val rightTimeIndicatorExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  
extractNameFromTimeAttribute(rightTemporalTableFunction.getTimeAttribute))
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightTemporalTableFunction.getPrimaryKey)
+
+relBuilder.push(
+  if 
(isProctimeIndicatorType(rightTemporalTableFunction.getTimeAttribute.resultType))
 {
+LogicalTemporalTableJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  leftTimeAttribute,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalTemporalTableJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  leftTimeAttribute,
+  rightTimeIndicatorExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transf

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218414712
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -803,70 +803,37 @@ abstract class StreamTableEnvironment(
 * @return The optimized [[RelNode]] tree
 */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): 
RelNode = {
-
-// 0. convert sub-queries before query decorrelation
-val convSubQueryPlan = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, 
relNode.getTraitSet)
-
-// 0. convert table references
-val fullRelNode = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP,
-  FlinkRuleSets.TABLE_REF_RULES,
-  convSubQueryPlan,
-  relNode.getTraitSet)
-
-// 1. decorrelate
+val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
+val fullRelNode = optimizeConvertTableReferences(temporalTableJoinPlan)
 val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
-// 2. convert time indicators
 val convPlan = RelTimeIndicatorConverter.convert(decorPlan, 
getRelBuilder.getRexBuilder)
 
 Review comment:
   Any suggestions maybe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218486273
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala
 ##
 @@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.flink.table.api.{Table, Types, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory.isProctimeIndicatorType
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+
+class LogicalCorrelateToTemporalTableJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToTemporalTableJoinRule") {
+
+  def extractNameFromTimeAttribute(timeAttribute: Expression): String = {
+timeAttribute match {
+  case RowtimeAttribute(expr: TemporalTableReference) =>
 
 Review comment:
   Dropped


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218462800
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+class DataStreamTemporalJoinToCoProcessTranslator(
+textualRepresentation: String,
+config: TableConfig,
+returnType: TypeInformation[Row],
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+joinInfo: JoinInfo,
+rexBuilder: RexBuilder)
+  extends DataStreamJoinToCoProcessTranslator(
+config,
+returnType,
+leftSchema,
+rightSchema,
+joinInfo,
+rexBuilder) {
+
+  var leftTimeAttribute: Option[RexNode] = None
+
+  var rightTimeAttribute: Option[RexNode] = None
+
+  var rightPrimaryKeyExpression: Option[RexNode] = None
+
+  override val nonEquiJoinPredicates: Option[RexNode] = 
extractTemporalJoinCondition()
+
+  def extractTemporalJoinCondition(): Option[RexNode] = {
+checkState(
+  !joinInfo.isEqui,
+  "Missing %s in join condition",
+  TEMPORAL_JOIN_CONDITION)
+
+val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+  nonEquiJoinRex.toString)
+
+Some(temporalJoinConditionExtractor.apply(nonEquiJoinRex))
+  }
+
+  override protected def createCoProcessFunction(
+  joinType: JoinRelType,
+  queryConfig: StreamQueryConfig,
+  joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
+: CoProcessFunction[CRow, CRow, CRow] = {
+
+checkState(
+  leftTimeAttribute.isDefined &&
+rightPrimaryKeyExpression.isDefined,
+  "Missing %s in join condition",
+  TEMPORAL_JOIN_CONDITION)
+
+if (rightTimeAttribute.isDefined) {
 
 Review comment:
   If we add an additional flag, we just duplicate the same logic, since we 
would still need to handle this `Optional` field:
   ```
   if (isRowtimeMode) {
 throw new ValidationException(
s"Currently only proctime temporal joins are supported in 
[${textualRepresentation}]")
   }
   checkState(!rightTimeAttribute.isDefined);
   ```
   
   Alternative solution (IMO not worth the effort here) would be to split this 
class and have one for processing time and one for rowtime. Then we could skip 
this check altogether.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218414391
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ##
 @@ -448,48 +448,13 @@ abstract class BatchTableEnvironment(
 * @return The optimized [[RelNode]] tree
 */
   private[flink] def optimize(relNode: RelNode): RelNode = {
-
-// 0. convert sub-queries before query decorrelation
-val convSubQueryPlan = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, 
relNode.getTraitSet)
-
-// 0. convert table references
-val fullRelNode = runHepPlanner(
-  HepMatchOrder.BOTTOM_UP,
-  FlinkRuleSets.TABLE_REF_RULES,
-  convSubQueryPlan,
-  relNode.getTraitSet)
-
-// 1. decorrelate
+val convSubQueryPlan = optimizeConvertSubQueries(relNode)
 
 Review comment:
   Isn't the general solution to define `TableEnvironment` interface in java 
and only expanding it in Scala? Moving those methods to static utility class 
doesn't seems as a proper solution. I think it would deserve a separate ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218478087
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+class DataStreamTemporalJoinToCoProcessTranslator(
+textualRepresentation: String,
+config: TableConfig,
+returnType: TypeInformation[Row],
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+joinInfo: JoinInfo,
+rexBuilder: RexBuilder)
+  extends DataStreamJoinToCoProcessTranslator(
+config,
+returnType,
+leftSchema,
+rightSchema,
+joinInfo,
+rexBuilder) {
+
+  var leftTimeAttribute: Option[RexNode] = None
+
+  var rightTimeAttribute: Option[RexNode] = None
+
+  var rightPrimaryKeyExpression: Option[RexNode] = None
+
+  override val nonEquiJoinPredicates: Option[RexNode] = 
extractTemporalJoinCondition()
+
+  def extractTemporalJoinCondition(): Option[RexNode] = {
+checkState(
+  !joinInfo.isEqui,
+  "Missing %s in join condition",
+  TEMPORAL_JOIN_CONDITION)
+
+val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+  nonEquiJoinRex.toString)
+
+Some(temporalJoinConditionExtractor.apply(nonEquiJoinRex))
+  }
+
+  override protected def createCoProcessFunction(
+  joinType: JoinRelType,
+  queryConfig: StreamQueryConfig,
+  joinFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
+: CoProcessFunction[CRow, CRow, CRow] = {
+
+checkState(
+  leftTimeAttribute.isDefined &&
+rightPrimaryKeyExpression.isDefined,
+  "Missing %s in join condition",
+  TEMPORAL_JOIN_CONDITION)
+
+if (rightTimeAttribute.isDefined) {
+  throw new ValidationException(
+s"Currently only proctime temporal joins are supported in 
[${textualRepresentation}]")
+}
+
+joinType match {
+  case JoinRelType.INNER =>
+new TemporalJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  joinFunction.name,
+  joinFunction.code,
+  queryConfig)
+  case _ =>
+   throw new ValidationException(
+ s"Only ${JoinRelType.INNER} temporal join is supported in 
[${textualRepresentation}]")
+}
+  }
+
+  private class TemporalJoinConditionExtractor(
+  nonEquiJoinCondition: String)
+extends RexShuttle {
+
+override def visitCall(call: RexCall): RexNode = {
+  if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+return super.visitCall(call)
+  }
+
+  if (leftTimeAttribute.isDefined
+|| rightPrimaryKeyExpression.isDefined
+|| rightTimeAttribute.isDefined) {
+throw new ValidationException(
+  s"Multiple ${TEMPORAL_JOIN_CONDITION} functions in 
[${textualRepresentation}]")
+  }
+  if (call.getOperands.size() == 3) {
 
 Review comment:
   I have extracted this logic to 
`LogicalTemporalTableJoin.isRowtime

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218423118
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTemporalTableJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+
+class DataStreamTemporalTableJoinRule
+  extends ConverterRule(
+classOf[FlinkLogicalTemporalTableJoin],
+FlinkConventions.LOGICAL,
+FlinkConventions.DATASTREAM,
+"DataStreamTemporalTableJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalTemporalTableJoin = call.rel(0)
+val joinInfo = join.analyzeCondition
+
+val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
 
 Review comment:
   ok :(


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218374441
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ##
 @@ -678,6 +678,32 @@ case class WindowAggregate(
   }
 }
 
+case class TemporalTable(
 
 Review comment:
   `Constructor` is inconsistent with other case classes here.
   
   This is behaviour is already quite clearly visible in this class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218420897
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -400,6 +411,10 @@ class RexTimeIndicatorMaterializer(
 // materialize operands with time indicators
 val materializedOperands = updatedCall.getOperator match {
 
+  case tableFunction: TableSqlFunction
 
 Review comment:
   Good catch. This must have turned into a dead code. Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218479232
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils._
+import org.junit.Test
+
+class TemporalTableJoinValidationTest extends TableTestBase {
+
+  val util: TableTestUtil = streamTestUtil()
+
+  val orders = util.addTable[(Long, String, Timestamp)](
+"Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+
+  val ordersProctime = util.addTable[(Long, String)](
+"OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime)
+
+  val ordersWithoutTimeAttribute = util.addTable[(Long, String, Timestamp)](
+"OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime)
+
+  val ratesHistory = util.addTable[(String, Int, Timestamp)](
+"RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+
+  val ratesHistoryWithoutTimeAttribute = util.addTable[(String, Int, 
Timestamp)](
+"ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime)
+
+  @Test
+  def testInvalidFieldReference(): Unit = {
+expectedException.expect(classOf[ValidationException])
+expectedException.expectMessage("Cannot resolve field [foobar]")
+
+ratesHistory.createTemporalTableFunction('rowtime.rowtime, 'foobar)
+  }
+
+  @Test
+  def testInvalidStringFieldReference(): Unit = {
+expectedException.expect(classOf[ValidationException])
+expectedException.expectMessage("Cannot resolve field [foobar]")
+
+ratesHistory.createTemporalTableFunction("rowtime", "foobar")
+  }
+
+  @Test
+  def testNonTimeIndicatorOnRightSide(): Unit = {
+expectedException.expect(classOf[ValidationException])
+expectedException.expectMessage(
+  "Non rowtime timeAttribute [TIMESTAMP(3)] used to create 
TemporalTableFunction")
+
+val rates = 
ratesHistoryWithoutTimeAttribute.createTemporalTableFunction('rowtime, 
'currency)
 
 Review comment:
   But the previous version of this PR was doing it earlier on the API level.
   
   How is it handled by windowed aggregations/windowed joins?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218460055
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.{FlatJoinFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
TableException, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
+import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.join.TemporalJoin
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+class DataStreamTemporalJoinToCoProcessTranslator(
+textualRepresentation: String,
+config: TableConfig,
+returnType: TypeInformation[Row],
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+joinInfo: JoinInfo,
+rexBuilder: RexBuilder)
+  extends DataStreamJoinToCoProcessTranslator(
+config,
+returnType,
+leftSchema,
+rightSchema,
+joinInfo,
+rexBuilder) {
+
+  var leftTimeAttribute: Option[RexNode] = None
+
+  var rightTimeAttribute: Option[RexNode] = None
+
+  var rightPrimaryKeyExpression: Option[RexNode] = None
+
+  override val nonEquiJoinPredicates: Option[RexNode] = 
extractTemporalJoinCondition()
+
+  def extractTemporalJoinCondition(): Option[RexNode] = {
+checkState(
+  !joinInfo.isEqui,
+  "Missing %s in join condition",
+  TEMPORAL_JOIN_CONDITION)
+
+val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
 
 Review comment:
   I have refactored this class and moved out all of the construction code. It 
makes it nicer (for example `leftTimeAttribute`, `rightTimeAttribute` and 
`rightPrimaryKeyExpression` are now proper final fields


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218364079
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -139,6 +139,23 @@ case class WindowReference(name: String, tpe: 
Option[TypeInformation[_]] = None)
   override def toString: String = s"'$name"
 }
 
+case class TemporalTableReference(name: String, typeInformation: 
TypeInformation[_])
 
 Review comment:
   If I remember correctly, I introduced it in order to implement 
`RowtimeAttribute#validateInput` and `RowtimeAttribute#resultType`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218369289
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.types.Row
+
+/**
+  * Class representing temporal table function over some history table.
+  */
+class TemporalTableFunction private(
+@transient private val underlyingHistoryTable: Table,
+private val timeAttribute: Expression,
+private val primaryKey: String,
+private val resultType: RowTypeInfo)
+  extends TableFunction[Row] {
+
+  def eval(row: Timestamp): Unit = {
 
 Review comment:
   I don't think that's a good idea. That would brake the abstraction, and 
would require to adjust/refactor more code, while this is the special case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-18 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r218417820
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -166,9 +172,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 val right = join.getRight.accept(this)
 
 LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
-
   }
 
+  def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
+val left = temporalJoin.getLeft.accept(this)
+val right = temporalJoin.getRight.accept(this)
+
+temporalJoin.copy(temporalJoin.getTraitSet, List(left, right))
 
 Review comment:
   Why? The implementation looks like it's working and logically the output has 
still a valid processing time attribute.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-24 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212625803
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class VersionedJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  protected var rightState: ValueState[Row] = _
+  protected var cRowWrapper: CRowWrappingCollector = _
+
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+
+joinFunction = clazz.newInstance()
+
+val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
 
 Review comment:
   Could we do it as a follow up task to minimize PR size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-24 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833159
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+
+ob

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-23 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212257550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.types.Row
+
+/**
+  * Class represnting table version functions over some history table.
+  */
+class TableVersionFunction private(
+@transient private val _table: Table,
+private[flink] val versionField: String,
+private[flink] val primaryKey: String,
+private[flink] val resultType: RowTypeInfo,
+private[flink] val isProctime: Boolean)
+  extends TableFunction[Row] {
+
+  def eval(row: Timestamp): Unit = {
 
 Review comment:
   Without it I'm getting:
   ```
   org.apache.flink.table.api.ValidationException: Function class 
'org.apache.flink.table.functions.TemporalTableFunction' does not implement at 
least one method named 'eval' which is public, not abstract and (in case of 
table functions) not static.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208835295
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ##
 @@ -103,27 +97,42 @@ class DataStreamJoin(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-val config = tableEnv.getConfig
-val returnType = schema.typeInfo
-val keyPairs = joinInfo.pairs().toList
+validateKeyTypes()
 
-// get the equality keys
-val leftKeys = ArrayBuffer.empty[Int]
-val rightKeys = ArrayBuffer.empty[Int]
+val leftDataStream =
 
 Review comment:
   I would be against that. I'm depending on this change in this PR, so it 
defines strict order of those two commits. Either I wouldn't publish the actual 
versioned joins PR after this commit is merged, or the versioned joins PR would 
look exactly the same as it looks now. Splitting into two PRs adds a lot of 
overhead (in case of rebases or applying review changes you need to keep 
updating multiple PRs) and room for a lot of problems with PRs going out of 
sync. During reviewing it's also annoying, because you see the same code twice 
and reviewers (same or different ones) are often (almost always?) making half 
of the comments in one PR and half in the other.
   
   Besides, this is already in a separate commit so it can be reviewed 
separately. Maybe instead of reviewing PR all at once try reviewing it commit 
by commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208835846
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ##
 @@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase {
 
 testHarness.setProcessingTime(1)
 testHarness.processElement1(new StreamRecord(
-  CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+  CRow(1L: JLong, "1a1"), 1))
 
 Review comment:
   Again, I'm depending on this change in the following commit. What harm does 
it do here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208835295
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ##
 @@ -103,27 +97,42 @@ class DataStreamJoin(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-val config = tableEnv.getConfig
-val returnType = schema.typeInfo
-val keyPairs = joinInfo.pairs().toList
+validateKeyTypes()
 
-// get the equality keys
-val leftKeys = ArrayBuffer.empty[Int]
-val rightKeys = ArrayBuffer.empty[Int]
+val leftDataStream =
 
 Review comment:
   I would be against that. I'm depending on this change in this PR, so it 
defines strict order of those two commits. Either I wouldn't publish the actual 
versioned joins PR after this commit is merged, or the versioned joins PR would 
look exactly the same as it looks now. Splitting into two PRs adds a lot of 
overhead and room for a lot of problems with PRs going out of sync. During 
reviewing it's also annoying, because you see the same code twice and reviewers 
(same or different ones) are often (almost always?) making half of the comments 
in one PR and half in the other.
   
   Besides, this is already in a separate commit so it can be reviewed 
separately. Maybe instead of reviewing PR all at once try reviewing it commit 
by commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208829502
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -155,6 +158,90 @@ class Table(
 select(withResolvedAggFunctionCall: _*)
   }
 
+  /**
+* Creates table version function backed up by this table as a history 
table.
+*
+* @param version field for the [[TableVersionFunction]]. Must points to a 
time indicator
+* @param primaryKey field for the [[TableVersionFunction]].
+* @return [[TableVersionFunction]]
+*/
+  def createTableVersionFunction(
+  version: String,
+  primaryKey: String): TableVersionFunction = {
 
 Review comment:
   In first version me and @fhueske  didn't want to focus on this and we agreed 
to do this as a follow up task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208830206
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 ##
 @@ -46,6 +46,8 @@ class FlinkRelBuilder(
 relOptCluster,
 relOptSchema) {
 
+  def getRelOptSchema: RelOptSchema = relOptSchema
 
 Review comment:
   Yes, check for `getRelOptSchema` usage in this commit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208831842
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalVersionedJoin.scala
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util.Collections
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Represents a join between a table and 
[[org.apache.flink.table.functions.TableVersionFunction]]
+  *
+  * @param cluster
+  * @param traitSet
+  * @param left stream
+  * @param right table scan of underlying 
[[org.apache.flink.table.functions.TableVersionFunction]]
+  * @param condition must contain 
[[LogicalVersionedJoin#VERSIONING_JOIN_CONDITION()]] with
+  *  correctly defined references to rightVersioneExpression,
+  *  rightPrimaryKeyExpression and leftVersionExpression. We 
can not implement
+  *  those references as separate fields, because of problems 
with Calcite's
+  *  optimization rules like projections push downs, column
+  *  pruning/renaming/reordering, etc. Later 
rightVersioneExpression,
+  *  rightPrimaryKeyExpression and leftVersionExpression will 
be extracted from
+  *  the condition.
+  */
+class LogicalVersionedJoin private (
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode)
+  extends Join(
+cluster,
+traitSet,
+left,
+right,
+condition,
+Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]],
+JoinRelType.INNER) {
+
+  override def copy(
+   traitSet: RelTraitSet,
+   condition: RexNode,
+   left: RelNode,
+   right: RelNode,
+   joinType: JoinRelType,
+   semiJoinDone: Boolean): LogicalVersionedJoin = {
+checkArgument(joinType == this.getJoinType,
+  "Can not change join type".asInstanceOf[Object])
+checkArgument(semiJoinDone == this.isSemiJoinDone,
+  "Can not change semiJoinDone".asInstanceOf[Object])
+new LogicalVersionedJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  condition)
+  }
+}
+
+object LogicalVersionedJoin {
+  /**
+* See [[LogicalVersionedJoin#condition]]
+*/
+  val VERSIONING_JOIN_CONDITION = new SqlFunction(
 
 Review comment:
   We want and need to use Calcite's optimisations for joins (like pushing down 
predicate, column pruning, predicate pushdown etc). To enable that, 
`LogicalVersionedJoin` extends calcite's `Join`. However there is no way to 
expand `Join`'s semantic, that beside actual join condition we need some extra 
fields to handle versioning. The only way to do this, is to expose 
`leftVersionExpression`, `rightVersionExpression` and 
`rightPrimaryKeyExpression` via this `VERSIONING_JOIN_CONDITION`. Otherwise, if 
we kept those expressions on some hidden state, they could be 
pruned/renamed/reordered/... .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833159
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+
+ob

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833101
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+
+ob