Repository: flink
Updated Branches:
  refs/heads/master 1125122a7 -> 471345c0e


[FLINK-6232] [table] Add support for processing time inner windowed stream join.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba6c59e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba6c59e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba6c59e6

Branch: refs/heads/master
Commit: ba6c59e6d744958db7332f88ffdd46effc9ad400
Parents: 1125122
Author: hongyuhong <hongyuh...@huawei.com>
Authored: Thu Jul 6 11:24:04 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jul 18 00:36:41 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 109 +-----
 .../calcite/RelTimeIndicatorConverter.scala     |  10 +-
 .../flink/table/plan/nodes/CommonJoin.scala     |  73 ++++
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  47 ++-
 .../nodes/datastream/DataStreamWindowJoin.scala | 187 ++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   1 +
 .../datastream/DataStreamWindowJoinRule.scala   |  94 +++++
 .../runtime/join/ProcTimeWindowInnerJoin.scala  | 326 +++++++++++++++++
 .../table/runtime/join/WindowJoinUtil.scala     | 349 +++++++++++++++++++
 .../table/updateutils/UpdateCheckUtils.scala    | 128 +++++++
 .../table/api/scala/stream/sql/JoinITCase.scala | 117 +++++++
 .../table/api/scala/stream/sql/JoinTest.scala   | 235 +++++++++++++
 .../table/runtime/harness/JoinHarnessTest.scala | 236 +++++++++++++
 .../KeyedTwoInputStreamOperatorTestHarness.java |   9 +
 14 files changed, 1788 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c5a66b5..f026824 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -50,6 +50,7 @@ import 
org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowIn
 import org.apache.flink.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, 
StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.updateutils.UpdateCheckUtils
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -173,10 +174,10 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = 
false)
         // check for append only table
-        val isAppendOnlyTable = isAppendOnly(optimizedPlan)
+        val isAppendOnlyTable = UpdateCheckUtils.isAppendOnly(optimizedPlan)
         upsertSink.setIsAppendOnly(isAppendOnlyTable)
         // extract unique key fields
-        val tableKeys: Option[Array[String]] = 
getUniqueKeyFields(optimizedPlan)
+        val tableKeys: Option[Array[String]] = 
UpdateCheckUtils.getUniqueKeyFields(optimizedPlan)
         // check that we have keys if the table has changes (is not 
append-only)
         tableKeys match {
           case Some(keys) => upsertSink.setKeyFields(keys)
@@ -200,7 +201,7 @@ abstract class StreamTableEnvironment(
         // optimize plan
         val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = 
false)
         // verify table is an insert-only (append-only) table
-        if (!isAppendOnly(optimizedPlan)) {
+        if (!UpdateCheckUtils.isAppendOnly(optimizedPlan)) {
           throw new TableException(
             "AppendStreamTableSink requires that Table has only insert 
changes.")
         }
@@ -259,21 +260,6 @@ abstract class StreamTableEnvironment(
     }
   }
 
-  /** Validates that the plan produces only append changes. */
-  protected def isAppendOnly(plan: RelNode): Boolean = {
-    val appendOnlyValidator = new AppendOnlyValidator
-    appendOnlyValidator.go(plan)
-
-    appendOnlyValidator.isAppendOnly
-  }
-
-  /** Extracts the unique keys of the table produced by the plan. */
-  protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
-    val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.go(plan)
-    keyExtractor.keys
-  }
-
   /**
     * Creates a converter that maps the internal CRow type to Scala or Java 
Tuple2 with change flag.
     *
@@ -665,7 +651,7 @@ abstract class StreamTableEnvironment(
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     // if no change flags are requested, verify table is an insert-only 
(append-only) table.
-    if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
+    if (!withChangeFlag && !UpdateCheckUtils.isAppendOnly(logicalPlan)) {
       throw new TableException(
         "Table is not an append-only table. " +
         "Use the toRetractStream() in order to handle add and retract 
messages.")
@@ -749,90 +735,5 @@ abstract class StreamTableEnvironment(
         s"$sqlPlan"
   }
 
-  private class AppendOnlyValidator extends RelVisitor {
-
-    var isAppendOnly = true
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case s: DataStreamRel if s.producesUpdates =>
-          isAppendOnly = false
-        case _ =>
-          super.visit(node, ordinal, parent)
-      }
-    }
-  }
-
-  /** Identifies unique key fields in the output of a RelNode. */
-  private class UniqueKeyExtractor extends RelVisitor {
-
-    var keys: Option[Array[String]] = None
-
-    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
-      node match {
-        case c: DataStreamCalc =>
-          super.visit(node, ordinal, parent)
-          // check if input has keys
-          if (keys.isDefined) {
-            // track keys forward
-            val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.asScala
-              .map(p => {
-                c.getProgram.expandLocalRef(p.left) match {
-                    // output field is forwarded input field
-                  case i: RexInputRef => (i.getIndex, p.right)
-                    // output field is renamed input field
-                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
-                    a.getOperands.get(0) match {
-                      case ref: RexInputRef =>
-                        (ref.getIndex, p.right)
-                      case _ =>
-                        (-1, p.right)
-                    }
-                    // output field is not forwarded from input
-                  case _: RexNode => (-1, p.right)
-                }
-              })
-              // filter all non-forwarded fields
-              .filter(_._1 >= 0)
-              // resolve names of input fields
-              .map(io => (inNames.get(io._1), io._2))
-
-            // filter by input keys
-            val outKeys = inOutNames.filter(io => 
keys.get.contains(io._1)).map(_._2)
-            // check if all keys have been preserved
-            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
-              // all key have been preserved (but possibly renamed)
-              keys = Some(outKeys.toArray)
-            } else {
-              // some (or all) keys have been removed. Keys are no longer 
unique and removed
-              keys = None
-            }
-          }
-        case _: DataStreamOverAggregate =>
-          super.visit(node, ordinal, parent)
-          // keys are always forwarded by Over aggregate
-        case a: DataStreamGroupAggregate =>
-          // get grouping keys
-          val groupKeys = 
a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
-          keys = Some(groupKeys.toArray)
-        case w: DataStreamGroupWindowAggregate =>
-          // get grouping keys
-          val groupKeys =
-            
w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
-          // get window start and end time
-          val windowStartEnd = w.getWindowProperties.map(_.name)
-          // we have only a unique key if at least one window property is 
selected
-          if (windowStartEnd.nonEmpty) {
-            keys = Some(groupKeys ++ windowStartEnd)
-          }
-        case _: DataStreamRel =>
-          // anything else does not forward keys or might duplicate key, so we 
can stop
-          keys = None
-      }
-    }
-
-  }
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 385cab2..32d6f01 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -165,8 +165,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
     LogicalProject.create(input, projects, fieldNames)
   }
 
-  override def visit(join: LogicalJoin): RelNode =
-    throw new TableException("Logical join in a stream environment is not 
supported yet.")
+  override def visit(join: LogicalJoin): RelNode = {
+    val left = join.getLeft.accept(this)
+    val right = join.getRight.accept(this)
+
+    LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
+
+  }
+
 
   override def visit(correlate: LogicalCorrelate): RelNode = {
     // visit children and update inputs

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
new file mode 100644
index 0000000..7d0ca35
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelWriter
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConverters._
+
+trait CommonJoin {
+
+  private[flink] def joinSelectionToString(inputType: RelDataType): String = {
+    inputType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+  private[flink] def joinConditionToString(
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
+
+    val inFields = inputType.getFieldNames.asScala.toList
+    expression(joinCondition, inFields, None)
+  }
+
+  private[flink] def joinTypeToString(joinType: JoinRelType) = {
+    joinType match {
+      case JoinRelType.INNER => "InnerJoin"
+      case JoinRelType.LEFT=> "LeftOuterJoin"
+      case JoinRelType.RIGHT => "RightOuterJoin"
+      case JoinRelType.FULL => "FullOuterJoin"
+    }
+  }
+
+  private[flink] def joinToString(
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      joinType: JoinRelType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
+
+    s"${joinTypeToString(joinType)}" +
+      s"(where: (${joinConditionToString(inputType, joinCondition, 
expression)}), " +
+      s"join: (${joinSelectionToString(inputType)}))"
+  }
+
+  private[flink] def joinExplainTerms(
+      pw: RelWriter,
+      inputType: RelDataType,
+      joinCondition: RexNode,
+      joinType: JoinRelType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
RelWriter = {
+
+    pw.item("where", joinConditionToString(inputType, joinCondition, 
expression))
+      .item("join", joinSelectionToString(inputType))
+      .item("joinType", joinTypeToString(joinType))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index a6c31d3..e8f3b82 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.runtime.FlatJoinRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -55,6 +55,7 @@ class DataSetJoin(
     joinHint: JoinHint,
     ruleDescription: String)
   extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -76,14 +77,20 @@ class DataSetJoin(
   }
 
   override def toString: String = {
-    s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+    joinToString(
+      joinRowType,
+      joinCondition,
+      joinType,
+      getExpressionString)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("where", joinConditionToString)
-      .item("join", joinSelectionToString)
-      .item("joinType", joinTypeToString)
+    joinExplainTerms(
+      super.explainTerms(pw),
+      joinRowType,
+      joinCondition,
+      joinType,
+      getExpressionString)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -116,7 +123,8 @@ class DataSetJoin(
         "Joins should have at least one equality condition.\n" +
           s"\tLeft: ${left.toString},\n" +
           s"\tRight: ${right.toString},\n" +
-          s"\tCondition: ($joinConditionToString)"
+          s"\tCondition: (${joinConditionToString(joinRowType,
+            joinCondition, getExpressionString)})"
       )
     }
     else {
@@ -138,7 +146,8 @@ class DataSetJoin(
             "Equality join predicate on incompatible types.\n" +
               s"\tLeft: ${left.toString},\n" +
               s"\tRight: ${right.toString},\n" +
-              s"\tCondition: ($joinConditionToString)"
+              s"\tCondition: (${joinConditionToString(joinRowType,
+                joinCondition, getExpressionString)})"
           )
         }
       })
@@ -197,7 +206,9 @@ class DataSetJoin(
       genFunction.code,
       genFunction.returnType)
 
-    val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
+    val joinOpName =
+      s"where: (${joinConditionToString(joinRowType, joinCondition, 
getExpressionString)}), " +
+        s"join: (${joinSelectionToString(joinRowType)})"
 
     joinOperator
       .where(leftKeys.toArray: _*)
@@ -205,22 +216,4 @@ class DataSetJoin(
       .`with`(joinFun)
       .name(joinOpName)
   }
-
-  private def joinSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-  private def joinConditionToString: String = {
-
-    val inFields = joinRowType.getFieldNames.asScala.toList
-    getExpressionString(joinCondition, inFields, None)
-  }
-
-  private def joinTypeToString = joinType match {
-    case JoinRelType.INNER => "InnerJoin"
-    case JoinRelType.LEFT=> "LeftOuterJoin"
-    case JoinRelType.RIGHT => "RightOuterJoin"
-    case JoinRelType.FULL => "FullOuterJoin"
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
new file mode 100644
index 0000000..1315a79
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    joinCondition: RexNode,
+    joinType: JoinRelType,
+    leftSchema: RowSchema,
+    rightSchema: RowSchema,
+    schema: RowSchema,
+    isRowTime: Boolean,
+    leftLowerBound: Long,
+    leftUpperBound: Long,
+    remainCondition: Option[RexNode],
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+    with CommonJoin
+    with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataStreamWindowJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      joinCondition,
+      joinType,
+      leftSchema,
+      rightSchema,
+      schema,
+      isRowTime,
+      leftLowerBound,
+      leftUpperBound,
+      remainCondition,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    joinToString(
+      schema.logicalType,
+      joinCondition,
+      joinType,
+      getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    joinExplainTerms(
+      super.explainTerms(pw),
+      schema.logicalType,
+      joinCondition,
+      joinType,
+      getExpressionString)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    val config = tableEnv.getConfig
+
+    val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
+    val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+    if (!isLeftAppendOnly || !isRightAppendOnly) {
+      throw new TableException(
+        "Windowed stream join does not support updates.")
+    }
+
+    val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+    val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+    // get the equality keys and other condition
+    val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
+    val leftKeys = joinInfo.leftKeys.toIntArray
+    val rightKeys = joinInfo.rightKeys.toIntArray
+
+    // generate join function
+    val joinFunction =
+    WindowJoinUtil.generateJoinFunction(
+      config,
+      joinType,
+      leftSchema.physicalTypeInfo,
+      rightSchema.physicalTypeInfo,
+      schema,
+      remainCondition,
+      ruleDescription)
+
+    joinType match {
+      case JoinRelType.INNER =>
+        isRowTime match {
+          case false =>
+            // Proctime JoinCoProcessFunction
+            createProcTimeInnerJoinFunction(
+              leftDataStream,
+              rightDataStream,
+              joinFunction.name,
+              joinFunction.code,
+              leftKeys,
+              rightKeys
+            )
+          case true =>
+            // RowTime JoinCoProcessFunction
+            throw new TableException(
+              "RowTime inner join between stream and stream is not supported 
yet.")
+        }
+      case JoinRelType.FULL =>
+        throw new TableException(
+          "Full join between stream and stream is not supported yet.")
+      case JoinRelType.LEFT =>
+        throw new TableException(
+          "Left join between stream and stream is not supported yet.")
+      case JoinRelType.RIGHT =>
+        throw new TableException(
+          "Right join between stream and stream is not supported yet.")
+    }
+  }
+
+  def createProcTimeInnerJoinFunction(
+      leftDataStream: DataStream[CRow],
+      rightDataStream: DataStream[CRow],
+      joinFunctionName: String,
+      joinFunctionCode: String,
+      leftKeys: Array[Int],
+      rightKeys: Array[Int]): DataStream[CRow] = {
+
+    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
+    val procInnerJoinFunc = new ProcTimeWindowInnerJoin(
+      leftLowerBound,
+      leftUpperBound,
+      leftSchema.physicalTypeInfo,
+      rightSchema.physicalTypeInfo,
+      joinFunctionName,
+      joinFunctionCode)
+
+    if (!leftKeys.isEmpty) {
+      leftDataStream.connect(rightDataStream)
+        .keyBy(leftKeys, rightKeys)
+        .process(procInnerJoinFunc)
+        .returns(returnTypeInfo)
+    } else {
+      leftDataStream.connect(rightDataStream)
+        .keyBy(new NullByteKeySelector[CRow](), new 
NullByteKeySelector[CRow]())
+        .process(procInnerJoinFunc)
+        .setParallelism(1)
+        .setMaxParallelism(1)
+        .returns(returnTypeInfo)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index ebfbeb9..90bd624 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -191,6 +191,7 @@ object FlinkRuleSets {
     DataStreamUnionRule.INSTANCE,
     DataStreamValuesRule.INSTANCE,
     DataStreamCorrelateRule.INSTANCE,
+    DataStreamWindowJoinRule.INSTANCE,
     StreamTableSourceScanRule.INSTANCE
   )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
new file mode 100644
index 0000000..c7a190f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamWindowJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+
+class DataStreamWindowJoinRule
+  extends ConverterRule(
+    classOf[FlinkLogicalJoin],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
+    "DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+    val joinInfo = join.analyzeCondition
+
+    try {
+      WindowJoinUtil.analyzeTimeBoundary(
+        joinInfo.getRemaining(join.getCluster.getRexBuilder),
+        join.getLeft.getRowType.getFieldCount,
+        new RowSchema(join.getRowType),
+        join.getCluster.getRexBuilder,
+        TableConfig.DEFAULT)
+      true
+    } catch {
+      case _: TableException =>
+        false
+    }
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+    val join: FlinkLogicalJoin = rel.asInstanceOf[FlinkLogicalJoin]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConventions.DATASTREAM)
+    val convRight: RelNode = RelOptRule.convert(join.getInput(1), 
FlinkConventions.DATASTREAM)
+    val joinInfo = join.analyzeCondition
+    val leftRowSchema = new RowSchema(convLeft.getRowType)
+    val rightRowSchema = new RowSchema(convRight.getRowType)
+
+    val (isRowTime, leftLowerBoundary, leftUpperBoundary, remainCondition) =
+      WindowJoinUtil.analyzeTimeBoundary(
+        joinInfo.getRemaining(join.getCluster.getRexBuilder),
+        leftRowSchema.logicalArity,
+        new RowSchema(join.getRowType),
+        join.getCluster.getRexBuilder,
+        TableConfig.DEFAULT)
+
+    new DataStreamWindowJoin(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      join.getCondition,
+      join.getJoinType,
+      leftRowSchema,
+      rightRowSchema,
+      new RowSchema(rel.getRowType),
+      isRowTime,
+      leftLowerBoundary,
+      leftUpperBoundary,
+      remainCondition,
+      description)
+  }
+}
+
+object DataStreamWindowJoinRule {
+  val INSTANCE: RelOptRule = new DataStreamWindowJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
new file mode 100644
index 0000000..97d5ccc
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just support 
inner-join
+  *
+  * @param leftLowerBound
+  *        the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *        the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncName    the function code of other non-equi condition
+  * @param genJoinFuncCode    the function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+    private val leftLowerBound: Long,
+    private val leftUpperBound: Long,
+    private val element1Type: TypeInformation[Row],
+    private val element2Type: TypeInformation[Row],
+    private val genJoinFuncName: String,
+    private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+    with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+      s"Code:\n$genJoinFuncCode")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genJoinFuncName,
+      genJoinFuncCode)
+    LOG.debug("Instantiating JoinFunction.")
+    joinFunction = clazz.newInstance()
+
+    listToRemove = new util.ArrayList[Long]()
+    cRowWrapper = new CRowWrappingCollector()
+    cRowWrapper.setChange(true)
+
+    // initialize row state
+    val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+    val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo1)
+    row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1)
+
+    val rowListTypeInfo2: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element2Type)
+    val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("row2mapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo2)
+    row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2)
+
+    // initialize timer state
+    val valueStateDescriptor1: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
+    timerState1 = getRuntimeContext.getState(valueStateDescriptor1)
+
+    val valueStateDescriptor2: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
+    timerState2 = getRuntimeContext.getState(valueStateDescriptor2)
+  }
+
+  /**
+    * Process left stream records
+    *
+    * @param valueC The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement1(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(
+      valueC,
+      ctx,
+      out,
+      leftStreamWinSize,
+      timerState1,
+      row1MapState,
+      row2MapState,
+      -leftUpperBound,     // right stream lower
+      -leftLowerBound,     // right stream upper
+      true
+    )
+  }
+
+  /**
+    * Process right stream records
+    *
+    * @param valueC The input value.
+    * @param ctx   The ctx to register timer or get current time
+    * @param out   The collector for returning result values.
+    *
+    */
+  override def processElement2(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    processElement(
+      valueC,
+      ctx,
+      out,
+      rightStreamWinSize,
+      timerState2,
+      row2MapState,
+      row1MapState,
+      leftLowerBound,    // left stream upper
+      leftUpperBound,    // left stream upper
+      false
+    )
+  }
+
+  /**
+    * Called when a processing timer trigger.
+    * Expire left/right records which earlier than current time - windowsize.
+    *
+    * @param timestamp The timestamp of the firing timer.
+    * @param ctx       The ctx to register timer or get current time
+    * @param out       The collector for returning result values.
+    */
+  override def onTimer(
+      timestamp: Long,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    if (timerState1.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        leftStreamWinSize,
+        row1MapState,
+        timerState1,
+        ctx
+      )
+    }
+
+    if (timerState2.value == timestamp) {
+      expireOutTimeRow(
+        timestamp,
+        rightStreamWinSize,
+        row2MapState,
+        timerState2,
+        ctx
+      )
+    }
+  }
+
+  /**
+    * Puts an element from the input stream into state and search the other 
state to
+    * output records meet the condition, and registers a timer for the current 
record
+    * if there is no timer at present.
+    */
+  private def processElement(
+      valueC: CRow,
+      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+      out: Collector[CRow],
+      winSize: Long,
+      timerState: ValueState[Long],
+      rowMapState: MapState[Long, JList[Row]],
+      oppoRowMapState: MapState[Long, JList[Row]],
+      oppoLowerBound: Long,
+      oppoUpperBound: Long,
+      isLeft: Boolean): Unit = {
+
+    cRowWrapper.out = out
+
+    val value = valueC.row
+
+    val curProcessTime = ctx.timerService.currentProcessingTime
+    val oppoLowerTime = curProcessTime + oppoLowerBound
+    val oppoUpperTime = curProcessTime + oppoUpperBound
+
+    // only when windowsize != 0, we need to store the element
+    if (winSize != 0) {
+      // register a timer to expire the element
+      if (timerState.value == 0) {
+        ctx.timerService.registerProcessingTimeTimer(curProcessTime + winSize 
+ 1)
+        timerState.update(curProcessTime + winSize + 1)
+      }
+
+      var rowList = rowMapState.get(curProcessTime)
+      if (rowList == null) {
+        rowList = new util.ArrayList[Row]()
+      }
+      rowList.add(value)
+      rowMapState.put(curProcessTime, rowList)
+
+    }
+
+    // loop the other stream elements
+    val oppositeKeyIter = oppoRowMapState.keys().iterator()
+    while (oppositeKeyIter.hasNext) {
+      val eleTime = oppositeKeyIter.next()
+      if (eleTime < oppoLowerTime) {
+        listToRemove.add(eleTime)
+      } else if (eleTime <= oppoUpperTime) {
+        val oppoRowList = oppoRowMapState.get(eleTime)
+        var i = 0
+        if (isLeft) {
+          while (i < oppoRowList.size) {
+            joinFunction.join(value, oppoRowList.get(i), cRowWrapper)
+            i += 1
+          }
+        } else {
+          while (i < oppoRowList.size) {
+            joinFunction.join(oppoRowList.get(i), value, cRowWrapper)
+            i += 1
+          }
+        }
+      }
+    }
+
+    // expire records out-of-time
+    var i = listToRemove.size - 1
+    while (i >= 0) {
+      oppoRowMapState.remove(listToRemove.get(i))
+      i -= 1
+    }
+    listToRemove.clear()
+  }
+
+  /**
+    * Removes records which are outside the join window from the state.
+    * Registers a new timer if the state still holds records after the 
clean-up.
+    */
+  private def expireOutTimeRow(
+      curTime: Long,
+      winSize: Long,
+      rowMapState: MapState[Long, JList[Row]],
+      timerState: ValueState[Long],
+      ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = {
+
+    val expiredTime = curTime - winSize
+    val keyIter = rowMapState.keys().iterator()
+    var nextTimer: Long = 0
+    // Search for expired timestamps.
+    // If we find a non-expired timestamp, remember the timestamp and leave 
the loop.
+    // This way we find all expired timestamps if they are sorted without 
doing a full pass.
+    while (keyIter.hasNext && nextTimer == 0) {
+      val recordTime = keyIter.next
+      if (recordTime < expiredTime) {
+        listToRemove.add(recordTime)
+      } else {
+        nextTimer = recordTime
+      }
+    }
+
+    // Remove expired records from state
+    var i = listToRemove.size - 1
+    while (i >= 0) {
+      rowMapState.remove(listToRemove.get(i))
+      i -= 1
+    }
+    listToRemove.clear()
+
+    // If the state has non-expired timestamps, register a new timer.
+    // Otherwise clean the complete state for this input.
+    if (nextTimer != 0) {
+      ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1)
+      timerState.update(nextTimer + winSize + 1)
+    } else {
+      timerState.clear()
+      rowMapState.clear()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
new file mode 100644
index 0000000..fabeeba
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+    * Analyze time-condtion to get time boundary for each stream and get the 
time type
+    * and return remain condition.
+    *
+    * @param  condition           join condition
+    * @param  leftLogicalFieldCnt left stream logical field num
+    * @param  inputSchema         join rowtype schema
+    * @param  rexBuilder          util to build rexNode
+    * @param  config              table environment config
+    * @return isRowTime, left lower boundary, right lower boundary, remain 
condition
+    */
+  private[flink] def analyzeTimeBoundary(
+      condition: RexNode,
+      leftLogicalFieldCnt: Int,
+      inputSchema: RowSchema,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+    // Converts the condition to conjunctive normal form (CNF)
+    val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+    // split the condition into time indicator condition and other condition
+    val (timeTerms, remainTerms) = cnfCondition match {
+      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+        c.getOperands.asScala
+          .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, 
inputSchema.logicalType))
+          .reduceLeft((l, r) => {
+            (l._1 ++ r._1, l._2 ++ r._2)
+          })
+      case _ =>
+        throw new TableException("A time-based stream join requires exactly " +
+          "two join predicates that bound the time in both directions.")
+    }
+
+    if (timeTerms.size != 2) {
+      throw new TableException("A time-based stream join requires exactly " +
+        "two join predicates that bound the time in both directions.")
+    }
+
+    // extract time offset from the time indicator conditon
+    val streamTimeOffsets =
+    timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, rexBuilder, 
config))
+
+    val (leftLowerBound, leftUpperBound) =
+      streamTimeOffsets match {
+        case Seq((x, true), (y, false)) => (x, y)
+        case Seq((x, false), (y, true)) => (y, x)
+        case _ =>
+          throw new TableException(
+            "Time-based join conditions must reference the time attribute of 
both input tables.")
+      }
+
+    // compose the remain condition list into one condition
+    val remainCondition =
+    remainTerms match {
+      case Seq() => None
+      case _ =>
+        // Converts logical field references to physical ones.
+        Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
+          RelOptUtil.andJoinFilters(rexBuilder, l, r)
+        }))
+    }
+
+    val isRowTime: Boolean = timeTerms(0)._1 match {
+      case x if FlinkTypeFactory.isProctimeIndicatorType(x) => false
+      case _ => true
+    }
+    (isRowTime, leftLowerBound, leftUpperBound, remainCondition)
+  }
+
+  /**
+    * Split the join conditions into time condition and non-time condition
+    *
+    * @return (Seq(timeTerms), Seq(remainTerms)),
+    */
+  private def analyzeCondtionTermType(
+      conditionTerm: RexNode,
+      leftFieldCount: Int,
+      inputType: RelDataType): (Seq[(RelDataType, Boolean, RexNode)], 
Seq[RexNode]) = {
+
+    conditionTerm match {
+      case c: RexCall if Seq(SqlKind.GREATER_THAN, 
SqlKind.GREATER_THAN_OR_EQUAL,
+        SqlKind.LESS_THAN, SqlKind.LESS_THAN_OR_EQUAL).contains(c.getKind) =>
+        val timeIndicators = extractTimeIndicatorAccesses(c, leftFieldCount, 
inputType)
+        timeIndicators match {
+          case Seq() =>
+            (Seq(), Seq(c))
+          case Seq(v1, v2) =>
+            if (v1._1 != v2._1) {
+              throw new TableException(
+                "Both time attributes in a join condition must be of the same 
type.")
+            }
+            if (v1._2 == v2._2) {
+              throw new TableException("Time-based join conditions " +
+                "must reference the time attribute of both input tables.")
+            }
+            (Seq((v1._1, v1._2, c)), Seq())
+          case _ =>
+            throw new TableException(
+              "Time-based join conditions must reference the time attribute of 
both input tables.")
+        }
+      case other =>
+        val timeIndicators = extractTimeIndicatorAccesses(other, 
leftFieldCount, inputType)
+        timeIndicators match {
+          case Seq() =>
+            (Seq(), Seq(other))
+          case _ =>
+            throw new TableException("Time indicators can not be used in non 
time-condition.")
+        }
+    }
+  }
+
+  /**
+    * Extracts all time indicator attributes that are accessed in an 
expression.
+    *
+    * @return seq(timeType, is left input time indicator)
+    */
+  def extractTimeIndicatorAccesses(
+      expression: RexNode,
+      leftFieldCount: Int,
+      inputType: RelDataType): Seq[(RelDataType, Boolean)] = {
+
+    expression match {
+      case i: RexInputRef =>
+        val idx = i.getIndex
+        inputType.getFieldList.get(idx).getType match {
+          case t: TimeIndicatorRelDataType if idx < leftFieldCount =>
+            // left table time indicator
+            Seq((t, true))
+          case t: TimeIndicatorRelDataType =>
+            // right table time indicator
+            Seq((t, false))
+          case _ => Seq()
+        }
+      case c: RexCall =>
+        c.operands.asScala
+          .map(extractTimeIndicatorAccesses(_, leftFieldCount, inputType))
+          .reduce(_ ++ _)
+      case _ => Seq()
+    }
+  }
+
+  /**
+    * Computes the absolute bound on the left operand of a comparison 
expression and
+    * whether the bound is an upper or lower bound.
+    *
+    * @return window boundary, is left lower bound
+    */
+  def extractTimeOffsetFromCondition(
+      timeTerm: RexNode,
+      isLeftExprBelongLeftTable: Boolean,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Long, Boolean) = {
+
+    val timeCall: RexCall = timeTerm.asInstanceOf[RexCall]
+
+    val isLeftLowerBound: Boolean =
+      timeTerm.getKind match {
+        // e.g a.proctime > b.proctime - 5 sec, then it's the lower bound of a 
and the value is -5
+        // e.g b.proctime > a.proctime - 5 sec, then it's not the lower bound 
of a but upper bound
+        case kind@(SqlKind.GREATER_THAN | SqlKind.GREATER_THAN_OR_EQUAL) =>
+          isLeftExprBelongLeftTable
+        // e.g a.proctime < b.proctime + 5 sec, the the upper bound of a is 5
+        case kind@(SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
+          !isLeftExprBelongLeftTable
+        case _ =>
+          throw new TableException("Unsupported time-condition.")
+      }
+
+    val (leftLiteral, rightLiteral) =
+      reduceTimeExpression(
+        timeCall.operands.get(0),
+        timeCall.operands.get(1),
+        rexBuilder,
+        config)
+    val tmpTimeOffset: Long =
+      if (isLeftExprBelongLeftTable) rightLiteral - leftLiteral else 
leftLiteral - rightLiteral
+
+    val boundary =
+      tmpTimeOffset.signum * (
+        if (timeTerm.getKind == SqlKind.LESS_THAN || timeTerm.getKind == 
SqlKind.GREATER_THAN) {
+          tmpTimeOffset.abs - 1
+        } else {
+          tmpTimeOffset.abs
+        })
+
+    (boundary, isLeftLowerBound)
+  }
+
+  /**
+    * Calculates the time boundary by replacing the time attribute by a zero 
literal
+    * and reducing the expression.
+    * For example:
+    * b.proctime - interval '1' second - interval '2' second will be 
translated to
+    * 0 - 1000 - 2000
+    */
+  private def reduceTimeExpression(
+      leftRexNode: RexNode,
+      rightRexNode: RexNode,
+      rexBuilder: RexBuilder,
+      config: TableConfig): (Long, Long) = {
+
+    /**
+      * replace the rowtime/proctime with zero literal.
+      */
+    def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
+      expr match {
+        case c: RexCall =>
+          // replace in call operands
+          val newOps = 
c.operands.asScala.map(replaceTimeFieldWithLiteral(_)).asJava
+          rexBuilder.makeCall(c.getType, c.getOperator, newOps)
+        case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) 
=>
+          // replace with timestamp
+          rexBuilder.makeZeroLiteral(expr.getType)
+        case _: RexInputRef =>
+          throw new TableException("Time join condition may only reference 
time indicator fields.")
+        case _ => expr
+      }
+    }
+
+    val literalLeftRex = replaceTimeFieldWithLiteral(leftRexNode)
+    val literalRightRex = replaceTimeFieldWithLiteral(rightRexNode)
+
+    val exprReducer = new ExpressionReducer(config)
+    val originList = new util.ArrayList[RexNode]()
+    originList.add(literalLeftRex)
+    originList.add(literalRightRex)
+    val reduceList = new util.ArrayList[RexNode]()
+    exprReducer.reduce(rexBuilder, originList, reduceList)
+
+    val literals = reduceList.asScala.map(f => f match {
+      case literal: RexLiteral =>
+        literal.getValue2.asInstanceOf[Long]
+      case _ =>
+        throw TableException(
+          "Time condition may only consist of time attributes, literals, and 
arithmetic operators.")
+    })
+
+    (literals(0), literals(1))
+  }
+
+
+  /**
+    * Generate other non-equi condition function
+    *
+    * @param  config          table env config
+    * @param  joinType        join type to determain whether input can be null
+    * @param  leftType        left stream type
+    * @param  rightType       right stream type
+    * @param  returnType      return type
+    * @param  otherCondition  non-equi condition
+    * @param  ruleDescription rule description
+    */
+  private[flink] def generateJoinFunction(
+      config: TableConfig,
+      joinType: JoinRelType,
+      leftType: TypeInformation[Row],
+      rightType: TypeInformation[Row],
+      returnType: RowSchema,
+      otherCondition: Option[RexNode],
+      ruleDescription: String) = {
+
+    // whether input can be null
+    val nullCheck = joinType match {
+      case JoinRelType.INNER => false
+      case JoinRelType.LEFT => true
+      case JoinRelType.RIGHT => true
+      case JoinRelType.FULL => true
+    }
+
+    // generate other non-equi function code
+    val generator = new CodeGenerator(
+      config,
+      nullCheck,
+      leftType,
+      Some(rightType))
+
+    val conversion = generator.generateConverterResultExpression(
+      returnType.physicalTypeInfo,
+      returnType.physicalType.getFieldNames.asScala)
+
+    // if other condition is none, then output the result directly
+    val body = otherCondition match {
+      case None =>
+        s"""
+           |${conversion.code}
+           |${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+      case Some(remainCondition) =>
+        val genCond = generator.generateExpression(remainCondition)
+        s"""
+           |${genCond.code}
+           |if (${genCond.resultTerm}) {
+           |  ${conversion.code}
+           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |}
+           |""".stripMargin
+    }
+
+    generator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Row, Row, Row]],
+      body,
+      returnType.physicalTypeInfo)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
new file mode 100644
index 0000000..769ba55
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/updateutils/UpdateCheckUtils.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.updateutils
+
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.datastream._
+import _root_.scala.collection.JavaConverters._
+
+object UpdateCheckUtils {
+
+  /** Validates that the plan produces only append changes. */
+  def isAppendOnly(plan: RelNode): Boolean = {
+    val appendOnlyValidator = new AppendOnlyValidator
+    appendOnlyValidator.go(plan)
+
+    appendOnlyValidator.isAppendOnly
+  }
+
+  /** Extracts the unique keys of the table produced by the plan. */
+  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    val keyExtractor = new UniqueKeyExtractor
+    keyExtractor.go(plan)
+    keyExtractor.keys
+  }
+
+  private class AppendOnlyValidator extends RelVisitor {
+
+    var isAppendOnly = true
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case s: DataStreamRel if s.producesUpdates =>
+          isAppendOnly = false
+        case _ =>
+          super.visit(node, ordinal, parent)
+      }
+    }
+  }
+
+  /** Identifies unique key fields in the output of a RelNode. */
+  private class UniqueKeyExtractor extends RelVisitor {
+
+    var keys: Option[Array[String]] = None
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case c: DataStreamCalc =>
+          super.visit(node, ordinal, parent)
+          // check if input has keys
+          if (keys.isDefined) {
+            // track keys forward
+            val inNames = c.getInput.getRowType.getFieldNames
+            val inOutNames = c.getProgram.getNamedProjects.asScala
+              .map(p => {
+                c.getProgram.expandLocalRef(p.left) match {
+                  // output field is forwarded input field
+                  case i: RexInputRef => (i.getIndex, p.right)
+                  // output field is renamed input field
+                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+                    a.getOperands.get(0) match {
+                      case ref: RexInputRef =>
+                        (ref.getIndex, p.right)
+                      case _ =>
+                        (-1, p.right)
+                    }
+                  // output field is not forwarded from input
+                  case _: RexNode => (-1, p.right)
+                }
+              })
+              // filter all non-forwarded fields
+              .filter(_._1 >= 0)
+              // resolve names of input fields
+              .map(io => (inNames.get(io._1), io._2))
+
+            // filter by input keys
+            val outKeys = inOutNames.filter(io => 
keys.get.contains(io._1)).map(_._2)
+            // check if all keys have been preserved
+            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+              // all key have been preserved (but possibly renamed)
+              keys = Some(outKeys.toArray)
+            } else {
+              // some (or all) keys have been removed. Keys are no longer 
unique and removed
+              keys = None
+            }
+          }
+        case _: DataStreamOverAggregate =>
+          super.visit(node, ordinal, parent)
+        // keys are always forwarded by Over aggregate
+        case a: DataStreamGroupAggregate =>
+          // get grouping keys
+          val groupKeys = 
a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+          keys = Some(groupKeys.toArray)
+        case w: DataStreamGroupWindowAggregate =>
+          // get grouping keys
+          val groupKeys =
+            
w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+          // get window start and end time
+          val windowStartEnd = w.getWindowProperties.map(_.name)
+          // we have only a unique key if at least one window property is 
selected
+          if (windowStartEnd.nonEmpty) {
+            keys = Some(groupKeys ++ windowStartEnd)
+          }
+        case _: DataStreamRel =>
+          // anything else does not forward keys or might duplicate key, so we 
can stop
+          keys = None
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
new file mode 100644
index 0000000..d4ff3f7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on 
t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second and t2.proctime + 
interval '5' second"
+
+    val data1 = new mutable.MutableList[(Int, Long, String)]
+    data1.+=((1, 1L, "Hi1"))
+    data1.+=((1, 2L, "Hi2"))
+    data1.+=((1, 5L, "Hi3"))
+    data1.+=((2, 7L, "Hi5"))
+    data1.+=((1, 9L, "Hi6"))
+    data1.+=((1, 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(Int, Long, String)]
+    data2.+=((1, 1L, "HiHi"))
+    data2.+=((2, 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    env.setParallelism(1)
+
+    val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on 
t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '5' second " +
+      "and t2.proctime + interval '5' second " +
+      "and t1.b > t2.b and t1.b + t2.b < 14"
+
+    val data1 = new mutable.MutableList[(String, Long, String)]
+    data1.+=(("1", 1L, "Hi1"))
+    data1.+=(("1", 2L, "Hi2"))
+    data1.+=(("1", 5L, "Hi3"))
+    data1.+=(("2", 7L, "Hi5"))
+    data1.+=(("1", 9L, "Hi6"))
+    data1.+=(("1", 8L, "Hi8"))
+
+    val data2 = new mutable.MutableList[(String, Long, String)]
+    data2.+=(("1", 5L, "HiHi"))
+    data2.+=(("2", 2L, "HeHe"))
+
+    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
new file mode 100644
index 0000000..15e8b89
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
'proctime.proctime)
+  streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
'proctime.proctime)
+
+  @Test
+  def testProcessingTimeInnerJoin() = {
+
+    val sqlQuery = "SELECT t1.a, t2.b " +
+      "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " +
+      "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + 
interval '1' hour"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where",
+            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
+              "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"),
+          term("join", "a, proctime, a0, b, proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  /** There should exist time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinUnExistTimeCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = 
t2.a"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** There should exist exactly two time conditions **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinSingleTimeCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = 
t2.a" +
+      " and t1.proctime > t2.proctime - interval '5' second"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Both time attributes in a join condition must be of the same type **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinDiffTimeIndicator() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = 
t2.a" +
+      " and t1.proctime > t2.proctime - interval '5' second " +
+      " and t1.proctime < t2.c + interval '5' second"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** The time conditions should be an And condition **/
+  @Test(expected = classOf[TableException])
+  def testWindowJoinNotCnfCondition() = {
+    val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = 
t2.a" +
+      " and (t1.proctime > t2.proctime - interval '5' second " +
+      " or t1.proctime < t2.c + interval '5' second)"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  @Test
+  def testJoinTimeBoundary(): Unit = {
+    verifyTimeBoundary(
+      "t1.proctime between t2.proctime - interval '1' hour " +
+        "and t2.proctime + interval '1' hour",
+      -3600000,
+      3600000,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.proctime > t2.proctime - interval '1' second and " +
+        "t1.proctime < t2.proctime + interval '1' second",
+      -999,
+      999,
+      "proctime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '1' second",
+      -1000,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c and " +
+        "t1.c <= t2.c + interval '1' second",
+      0,
+      1000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c + interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c - interval '1' second <= t1.c and " +
+        "t2.c + interval '10' second >= t1.c",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
+        "interval '10' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c >= t2.c - interval '10' second and " +
+        "t1.c <= t2.c - interval '5' second",
+      -10000,
+      -5000,
+      "rowtime")
+  }
+
+  @Test
+  def testJoinRemainConditionConvert(): Unit = {
+    streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 
'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 
'proctime.proctime)
+    val query =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = 
t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' 
second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query,
+      ">($1, $3)")
+
+    val query1 =
+      "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = 
t2.a and " +
+        "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' 
second "
+    verifyRemainConditionConvert(
+      query1,
+      "")
+
+    streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 
'proctime.proctime)
+    streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 
'proctime.proctime)
+    val query2 =
+      "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = 
t2.a and " +
+        "t1.proctime >= t2.proctime - interval '10' second " +
+        "and t1.proctime <= t2.proctime - interval '5' second and " +
+        "t1.c > t2.c"
+    verifyRemainConditionConvert(
+      query2,
+      ">($2, $5)")
+  }
+
+  def verifyTimeBoundary(
+      timeSql: String,
+      expLeftSize: Long,
+      expRightSize: Long,
+      expTimeType: String) = {
+    val query =
+      "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
+
+    val resultTable = streamUtil.tEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val rexNode = joinNode.getCondition
+    val (isRowTime, lowerBound, upperBound, conditionWithoutTime) =
+      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new 
RowSchema(joinNode.getRowType),
+        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+    val timeTypeStr =
+      if (isRowTime) "rowtime"
+      else  "proctime"
+    assertEquals(expLeftSize, lowerBound)
+    assertEquals(expRightSize, upperBound)
+    assertEquals(expTimeType, timeTypeStr)
+  }
+
+  def verifyRemainConditionConvert(
+      query: String,
+      expectCondStr: String) = {
+
+    val resultTable = streamUtil.tEnv.sql(query)
+    val relNode = resultTable.getRelNode
+    val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+    val joinInfo = joinNode.analyzeCondition
+    val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder)
+    val (isRowTime, lowerBound, upperBound, remainCondition) =
+      WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new 
RowSchema(joinNode.getRowType),
+        joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig)
+
+    val actual: String = remainCondition.getOrElse("").toString
+
+    assertEquals(expectCondStr, actual)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
new file mode 100644
index 0000000..c008ed3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, 
TwoInputStreamOperatorTestHarness}
+import org.apache.flink.table.codegen.GeneratedFunction
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 TupleRowKeySelector}
+import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Test
+
+
+class JoinHarnessTest extends HarnessTestBase{
+
+  private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO),
+    Array("a", "b"))
+
+
+  val funcCode: String =
+    """
+      |public class TestJoinFunction
+      |          extends 
org.apache.flink.api.common.functions.RichFlatJoinFunction {
+      |  transient org.apache.flink.types.Row out =
+      |            new org.apache.flink.types.Row(4);
+      |  public TestJoinFunction() throws Exception {}
+      |
+      |  @Override
+      |  public void open(org.apache.flink.configuration.Configuration 
parameters)
+      |  throws Exception {}
+      |
+      |  @Override
+      |  public void join(Object _in1, Object _in2, 
org.apache.flink.util.Collector c)
+      |   throws Exception {
+      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
+      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
+      |
+      |   out.setField(0, in1.getField(0));
+      |   out.setField(1, in1.getField(1));
+      |   out.setField(2, in2.getField(0));
+      |   out.setField(3, in2.getField(1));
+      |
+      |   c.collect(out);
+      |
+      |  }
+      |
+      |  @Override
+      |  public void close() throws Exception {}
+      |}
+    """.stripMargin
+
+  /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime + 20 **/
+  @Test
+  def testNormalProcTimeJoin() {
+
+    val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, 20, rT, rT, 
"TestJoinFunction", funcCode)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, 
CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+       operator,
+       new TupleRowKeySelector[Integer](0),
+       new TupleRowKeySelector[Integer](0),
+       BasicTypeInfo.INT_TYPE_INFO,
+       1,1,0)
+
+    testHarness.open()
+
+    // left stream input
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa"), true), 1))
+    assert(testHarness.numProcessingTimeTimers() == 1)
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb"), true), 2))
+    assert(testHarness.numProcessingTimeTimers() == 2)
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // right stream input and output normally
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi1"), true), 3))
+    testHarness.setProcessingTime(4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello1"), true), 4))
+    assert(testHarness.numKeyedStateEntries() == 8)
+    assert(testHarness.numProcessingTimeTimers() == 4)
+
+    // expired left stream record at timestamp 1
+    testHarness.setProcessingTime(12)
+    assert(testHarness.numKeyedStateEntries() == 8)
+    assert(testHarness.numProcessingTimeTimers() == 4)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "Hi2"), true), 12))
+
+    // expired right stream record at timestamp 4 and all left stream
+    testHarness.setProcessingTime(25)
+    assert(testHarness.numKeyedStateEntries() == 2)
+    assert(testHarness.numProcessingTimeTimers() == 1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3"), true), 25))
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb2"), true), 25))
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "Hello2"), true), 25))
+
+    testHarness.setProcessingTime(45)
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(46)
+    assert(testHarness.numKeyedStateEntries() == 0)
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa", 1: JInt, "Hi1"), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi1"), true), 3))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb", 2: JInt, "Hello1"), true), 4))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa2", 1: JInt, "Hi2"), true), 12))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "Hi2"), true), 25))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb2", 2: JInt, "Hello2"), true), 25))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  /** a.proctime >= b.proctime - 10 and a.proctime <= b.proctime - 5 **/
+  @Test
+  def testProcTimeJoinSingleNeedStore() {
+
+    val joinProcessFunc = new ProcTimeWindowInnerJoin(-10, -5, rT, rT, 
"TestJoinFunction", funcCode)
+
+    val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
+      new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
+    val testHarness: KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, 
CRow, CRow] =
+      new KeyedTwoInputStreamOperatorTestHarness[Integer, CRow, CRow, CRow](
+        operator,
+        new TupleRowKeySelector[Integer](0),
+        new TupleRowKeySelector[Integer](0),
+        BasicTypeInfo.INT_TYPE_INFO,
+        1,1,0)
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa1"), true), 1))
+    testHarness.setProcessingTime(2)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa2"), true), 2))
+    testHarness.setProcessingTime(3)
+    testHarness.processElement1(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // Do not store b elements
+    // not meet a.proctime <= b.proctime - 5
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb3"), true), 3))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // meet a.proctime <= b.proctime - 5
+    testHarness.setProcessingTime(7)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(2: JInt, "bbb7"), true), 7))
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+
+    // expire record of stream a at timestamp 1
+    testHarness.setProcessingTime(12)
+    assert(testHarness.numKeyedStateEntries() == 4)
+    assert(testHarness.numProcessingTimeTimers() == 2)
+    testHarness.processElement2(new StreamRecord(
+      CRow(Row.of(1: JInt, "bbb12"), true), 12))
+
+    testHarness.setProcessingTime(13)
+    assert(testHarness.numKeyedStateEntries() == 2)
+    assert(testHarness.numProcessingTimeTimers() == 1)
+
+    testHarness.setProcessingTime(14)
+    assert(testHarness.numKeyedStateEntries() == 0)
+    assert(testHarness.numProcessingTimeTimers() == 0)
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba6c59e6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 10c79d0..b0500ca 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
@@ -123,4 +124,12 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, 
IN1, IN2, OUT>
 
                super.initializeState(operatorStateHandles);
        }
+
+       public int numKeyedStateEntries() {
+               if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+                       return ((HeapKeyedStateBackend) 
keyedStateBackend).numStateEntries();
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
 }

Reply via email to