[FLINK-6233] [table] Add more tests for rowtime window join + minor refactoring.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ea7f49a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ea7f49a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ea7f49a

Branch: refs/heads/master
Commit: 1ea7f49a5030ae481122d34915ca14d30b8626f5
Parents: 655d8b1
Author: Fabian Hueske <[email protected]>
Authored: Tue Oct 10 14:48:24 2017 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Tue Oct 10 23:09:07 2017 +0200

----------------------------------------------------------------------
 .../nodes/datastream/DataStreamWindowJoin.scala |  16 +-
 .../datastream/DataStreamWindowJoinRule.scala   |  17 +-
 .../join/ProcTimeBoundedStreamInnerJoin.scala   |  17 +-
 .../join/RowTimeBoundedStreamInnerJoin.scala    |  18 +-
 .../join/TimeBoundedStreamInnerJoin.scala       |  38 ++--
 .../table/runtime/join/WindowJoinUtil.scala     |  21 --
 .../flink/table/api/stream/sql/JoinTest.scala   |  93 ++++++++-
 .../table/runtime/harness/HarnessTestBase.scala |  20 ++
 .../table/runtime/harness/JoinHarnessTest.scala |  53 +++--
 .../table/runtime/stream/sql/JoinITCase.scala   | 209 ++++++++++++++-----
 10 files changed, 368 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 9358aa3..3e23006 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -136,6 +136,11 @@ class DataStreamWindowJoin(
       remainCondition,
       ruleDescription)
 
+    val joinOpName =
+      s"where: (" +
+        s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
+        s"join: (${joinSelectionToString(schema.relDataType)})"
+
     joinType match {
       case JoinRelType.INNER =>
         if (relativeWindowSize < 0) {
@@ -148,6 +153,7 @@ class DataStreamWindowJoin(
               leftDataStream,
               rightDataStream,
               returnTypeInfo,
+              joinOpName,
               joinFunction.name,
               joinFunction.code,
               leftKeys,
@@ -158,6 +164,7 @@ class DataStreamWindowJoin(
               leftDataStream,
               rightDataStream,
               returnTypeInfo,
+              joinOpName,
               joinFunction.name,
               joinFunction.code,
               leftKeys,
@@ -202,6 +209,7 @@ class DataStreamWindowJoin(
       leftDataStream: DataStream[CRow],
       rightDataStream: DataStream[CRow],
       returnTypeInfo: TypeInformation[CRow],
+      operatorName: String,
       joinFunctionName: String,
       joinFunctionCode: String,
       leftKeys: Array[Int],
@@ -210,7 +218,6 @@ class DataStreamWindowJoin(
     val procInnerJoinFunc = new ProcTimeBoundedStreamInnerJoin(
       leftLowerBound,
       leftUpperBound,
-      allowedLateness = 0L,
       leftSchema.typeInfo,
       rightSchema.typeInfo,
       joinFunctionName,
@@ -220,6 +227,7 @@ class DataStreamWindowJoin(
       leftDataStream.connect(rightDataStream)
         .keyBy(leftKeys, rightKeys)
         .process(procInnerJoinFunc)
+        .name(operatorName)
         .returns(returnTypeInfo)
     } else {
       leftDataStream.connect(rightDataStream)
@@ -227,6 +235,7 @@ class DataStreamWindowJoin(
         .process(procInnerJoinFunc)
         .setParallelism(1)
         .setMaxParallelism(1)
+        .name(operatorName)
         .returns(returnTypeInfo)
     }
   }
@@ -235,6 +244,7 @@ class DataStreamWindowJoin(
       leftDataStream: DataStream[CRow],
       rightDataStream: DataStream[CRow],
       returnTypeInfo: TypeInformation[CRow],
+      operatorName: String,
       joinFunctionName: String,
       joinFunctionCode: String,
       leftKeys: Array[Int],
@@ -256,7 +266,7 @@ class DataStreamWindowJoin(
         .connect(rightDataStream)
         .keyBy(leftKeys, rightKeys)
         .transform(
-          "InnerRowtimeWindowJoin",
+          operatorName,
           returnTypeInfo,
           new KeyedCoProcessOperatorWithWatermarkDelay[Tuple, CRow, CRow, 
CRow](
             rowTimeInnerJoinFunc,
@@ -266,7 +276,7 @@ class DataStreamWindowJoin(
       leftDataStream.connect(rightDataStream)
         .keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow])
         .transform(
-          "InnerRowtimeWindowJoin",
+          operatorName,
           returnTypeInfo,
           new KeyedCoProcessOperatorWithWatermarkDelay[java.lang.Byte, CRow, 
CRow, CRow](
             rowTimeInnerJoinFunc,

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index d208d2b..a7358c7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -41,29 +41,22 @@ class DataStreamWindowJoinRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
 
-    val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
+    val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
       join.getCondition,
       join.getLeft.getRowType.getFieldCount,
       join.getRowType,
       join.getCluster.getRexBuilder,
       TableConfig.DEFAULT)
 
-    // remaining predicate must not access time attributes
-    val remainingPredsAccessTime = remainingPreds.isDefined &&
-      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
-
     if (windowBounds.isDefined) {
       if (windowBounds.get.isEventTime) {
-        !remainingPredsAccessTime
+        true
       } else {
-        // Check that no event-time attributes are in the input.
-        // The proc-time join implementation does ensure that record timestamp 
are correctly set.
-        // It is always the timestamp of the later arriving record.
+        // Check that no event-time attributes are in the input because the 
processing time window
+        // join does not correctly hold back watermarks.
         // We rely on projection pushdown to remove unused attributes before 
the join.
-        val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+        !join.getRowType.getFieldList.asScala
           .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
-
-        !remainingPredsAccessTime && !rowTimeAttrInOutput
       }
     } else {
       // the given join does not have valid window bounds. We cannot translate 
it.

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
index ab5a9c3..3bac42c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
@@ -29,19 +29,18 @@ import org.apache.flink.types.Row
 final class ProcTimeBoundedStreamInnerJoin(
     leftLowerBound: Long,
     leftUpperBound: Long,
-    allowedLateness: Long,
     leftType: TypeInformation[Row],
     rightType: TypeInformation[Row],
     genJoinFuncName: String,
     genJoinFuncCode: String)
-    extends TimeBoundedStreamInnerJoin(
-      leftLowerBound,
-      leftUpperBound,
-      allowedLateness,
-      leftType,
-      rightType,
-      genJoinFuncName,
-      genJoinFuncCode) {
+  extends TimeBoundedStreamInnerJoin(
+    leftLowerBound,
+    leftUpperBound,
+    allowedLateness = 0L,
+    leftType,
+    rightType,
+    genJoinFuncName,
+    genJoinFuncCode) {
 
   override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
     leftOperatorTime = ctx.timerService().currentProcessingTime()

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
index 5cf5a53..a2d9dca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
@@ -36,18 +36,18 @@ final class RowTimeBoundedStreamInnerJoin(
     genJoinFuncCode: String,
     leftTimeIdx: Int,
     rightTimeIdx: Int)
-    extends TimeBoundedStreamInnerJoin(
-      leftLowerBound,
-      leftUpperBound,
-      allowedLateness,
-      leftType,
-      rightType,
-      genJoinFuncName,
-      genJoinFuncCode) {
+  extends TimeBoundedStreamInnerJoin(
+    leftLowerBound,
+    leftUpperBound,
+    allowedLateness,
+    leftType,
+    rightType,
+    genJoinFuncName,
+    genJoinFuncCode) {
 
   /**
     * Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
-    * Only reasonable for row time join.
+    * This is the time interval by which watermarks need to be held back.
     *
     * @return the maximum delay for the outputs
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
index 7bf3d33..9625eac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
@@ -38,15 +38,16 @@ import org.apache.flink.util.Collector
 /**
   * A CoProcessFunction to execute time-bounded stream inner-join.
   * Two kinds of time criteria:
-  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y 
and L.time - X".
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y 
and L.time - X" where
+  * X and Y might be negative or positive and X <= Y.
   *
   * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
   * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
   * @param allowedLateness the lateness allowed for the two streams
   * @param leftType        the input type of left stream
   * @param rightType       the input type of right stream
-  * @param genJoinFuncName the function code of other non-equi conditions
-  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param genJoinFuncName the name of the generated function
+  * @param genJoinFuncCode the code of function to evaluate the non-window 
join conditions
   *
   */
 abstract class TimeBoundedStreamInnerJoin(
@@ -57,9 +58,9 @@ abstract class TimeBoundedStreamInnerJoin(
     private val rightType: TypeInformation[Row],
     private val genJoinFuncName: String,
     private val genJoinFuncCode: String)
-    extends CoProcessFunction[CRow, CRow, CRow]
-    with Compiler[FlatJoinFunction[Row, Row, Row]]
-    with Logging {
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
 
   private var cRowWrapper: CRowWrappingCollector = _
 
@@ -79,15 +80,16 @@ abstract class TimeBoundedStreamInnerJoin(
   protected val leftRelativeSize: Long = -leftLowerBound
   protected val rightRelativeSize: Long = leftUpperBound
 
+  // Points in time until which the respective cache has been cleaned.
   private var leftExpirationTime: Long = 0L
   private var rightExpirationTime: Long = 0L
 
+  // Current time on the respective input stream.
   protected var leftOperatorTime: Long = 0L
   protected var rightOperatorTime: Long = 0L
 
-
-  // for delayed cleanup
-  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+  // Minimum interval by which state is cleaned up
+  private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
 
   if (allowedLateness < 0) {
     throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
@@ -140,12 +142,14 @@ abstract class TimeBoundedStreamInnerJoin(
       cRowValue: CRow,
       ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
+
     updateOperatorTime(ctx)
     val leftRow = cRowValue.row
     val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
     val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
     val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
     cRowWrapper.out = out
+
     // Check if we need to cache the current row.
     if (rightOperatorTime < rightQualifiedUpperBound) {
       // Operator time of right stream has not exceeded the upper window bound 
of the current
@@ -164,7 +168,7 @@ abstract class TimeBoundedStreamInnerJoin(
     }
     // Check if we need to join the current row against cached rows of the 
right input.
     // The condition here should be rightMinimumTime < 
rightQualifiedUpperBound.
-    // I use rightExpirationTime as an approximation of the rightMinimumTime 
here,
+    // We use rightExpirationTime as an approximation of the rightMinimumTime 
here,
     // since rightExpirationTime <= rightMinimumTime is always true.
     if (rightExpirationTime < rightQualifiedUpperBound) {
       // Upper bound of current join window has not passed the cache 
expiration time yet.
@@ -199,12 +203,14 @@ abstract class TimeBoundedStreamInnerJoin(
       cRowValue: CRow,
       ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
+
     updateOperatorTime(ctx)
     val rightRow = cRowValue.row
     val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow)
     val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize
     val leftQualifiedUpperBound: Long =  timeForRightRow + rightRelativeSize
     cRowWrapper.out = out
+
     // Check if we need to cache the current row.
     if (leftOperatorTime < leftQualifiedUpperBound) {
       // Operator time of left stream has not exceeded the upper window bound 
of the current
@@ -223,7 +229,7 @@ abstract class TimeBoundedStreamInnerJoin(
     }
     // Check if we need to join the current row against cached rows of the 
left input.
     // The condition here should be leftMinimumTime < leftQualifiedUpperBound.
-    // I use leftExpirationTime as an approximation of the leftMinimumTime 
here,
+    // We use leftExpirationTime as an approximation of the leftMinimumTime 
here,
     // since leftExpirationTime <= leftMinimumTime is always true.
     if (leftExpirationTime < leftQualifiedUpperBound) {
       leftExpirationTime = calExpirationTime(rightOperatorTime, 
leftRelativeSize)
@@ -261,6 +267,7 @@ abstract class TimeBoundedStreamInnerJoin(
       timestamp: Long,
       ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
       out: Collector[CRow]): Unit = {
+
     updateOperatorTime(ctx)
     // In the future, we should separate the left and right watermarks. 
Otherwise, the
     // registered timer of the faster stream will be delayed, even if the 
watermarks have
@@ -316,11 +323,11 @@ abstract class TimeBoundedStreamInnerJoin(
       rowTime: Long,
       leftRow: Boolean): Unit = {
     if (leftRow) {
-      val cleanupTime = rowTime + leftRelativeSize + cleanupDelay + 
allowedLateness + 1
+      val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + 
allowedLateness + 1
       registerTimer(ctx, cleanupTime)
       rightTimerState.update(cleanupTime)
     } else {
-      val cleanupTime = rowTime + rightRelativeSize + cleanupDelay + 
allowedLateness + 1
+      val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + 
allowedLateness + 1
       registerTimer(ctx, cleanupTime)
       leftTimerState.update(cleanupTime)
     }
@@ -361,6 +368,7 @@ abstract class TimeBoundedStreamInnerJoin(
         }
       }
     }
+
     if (earliestTimestamp > 0) {
       // There are rows left in the cache. Register a timer to expire them 
later.
       registerCleanUpTimer(
@@ -385,6 +393,8 @@ abstract class TimeBoundedStreamInnerJoin(
   /**
     * Return the time for the target row from the left stream.
     *
+    * Requires that [[updateOperatorTime()]] has been called before.
+    *
     * @param context the runtime context
     * @param row     the target row
     * @return time for the target row
@@ -394,6 +404,8 @@ abstract class TimeBoundedStreamInnerJoin(
   /**
     * Return the time for the target row from the right stream.
     *
+    * Requires that [[updateOperatorTime()]] has been called before.
+    *
     * @param context the runtime context
     * @param row     the target row
     * @return time for the target row

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 6f97f2a..863f342 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -266,27 +266,6 @@ object WindowJoinUtil {
   }
 
   /**
-    * Checks if an expression accesses a time attribute.
-    *
-    * @param expr The expression to check.
-    * @param inputType The input type of the expression.
-    * @return True, if the expression accesses a time attribute. False 
otherwise.
-    */
-  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
-    expr match {
-      case i: RexInputRef =>
-        val accessedType = inputType.getFieldList.get(i.getIndex).getType
-        accessedType match {
-          case _: TimeIndicatorRelDataType => true
-          case _ => false
-        }
-      case c: RexCall =>
-        c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
-      case _ => false
-    }
-  }
-
-  /**
     * Checks if an expression accesses a non-time attribute.
     *
     * @param expr The expression to check.

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index a4234c5..53aff82 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.api.stream.sql
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
-import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestUtil.{term, _}
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Assert._
 import org.junit.Test
@@ -184,6 +185,96 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t1.b, SUM(t2.a) AS aSum, COUNT(t2.b) AS bCnt
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' MINUTE AND t2.c + INTERVAL '1' 
HOUR
+        |GROUP BY TUMBLE(t1.c, INTERVAL '6' HOUR), t1.b
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          binaryNode(
+            "DataStreamWindowJoin",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(0),
+              term("select", "a", "b", "c")
+            ),
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(1),
+              term("select", "a", "b", "c")
+            ),
+            term("where",
+              "AND(=(a, a0), >=(c, -(c0, 600000)), " +
+                "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+            term("join", "a, b, c, a0, b0, c0"),
+            term("joinType", "InnerJoin")
+          ),
+          term("select", "c", "b", "a0", "b0")
+        ),
+        term("groupBy", "b"),
+        term("window", TumblingGroupWindow('w$, 'c, 21600000.millis)),
+        term("select", "b", "SUM(a0) AS aSum", "COUNT(b0) AS bCnt")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testRowTimeInnerJoinAndWindowAggregationOnSecond(): Unit = {
+
+    val sqlQuery =
+      """
+        |SELECT t2.b, SUM(t1.a) AS aSum, COUNT(t1.b) AS bCnt
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.c BETWEEN t2.c - INTERVAL '10' MINUTE AND t2.c + INTERVAL '1' 
HOUR
+        |GROUP BY TUMBLE(t2.c, INTERVAL '6' HOUR), t2.b
+        |""".stripMargin
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          binaryNode(
+            "DataStreamWindowJoin",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(0),
+              term("select", "a", "b", "c")
+            ),
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(1),
+              term("select", "a", "b", "c")
+            ),
+            term("where",
+              "AND(=(a, a0), >=(c, -(c0, 600000)), " +
+                "<=(c, DATETIME_PLUS(c0, 3600000)))"),
+            term("join", "a, b, c, a0, b0, c0"),
+            term("joinType", "InnerJoin")
+          ),
+          term("select", "c0", "b0", "a", "b")
+        ),
+        term("groupBy", "b0"),
+        term("window", TumblingGroupWindow('w$, 'c0, 21600000.millis)),
+        term("select", "b0", "SUM(a) AS aSum", "COUNT(b) AS bCnt")
+      )
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testJoinTimeBoundary(): Unit = {
     verifyTimeBoundary(
       "t1.proctime between t2.proctime - interval '1' hour " +

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 67164b7..942846c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -355,6 +355,26 @@ object HarnessTestBase {
   }
 
   /**
+    * Return 0 for equal Rows and non zero for different rows
+    */
+  class RowResultSortComparatorWithWatermarks()
+    extends Comparator[Object] with Serializable {
+
+    override def compare(o1: Object, o2: Object): Int = {
+
+      (o1, o2) match {
+        case (w1: Watermark, w2: Watermark) =>
+          w1.getTimestamp.compareTo(w2.getTimestamp)
+        case (r1: StreamRecord[CRow], r2: StreamRecord[CRow]) =>
+          r1.getValue.toString.compareTo(r2.getValue.toString)
+        case (_: Watermark, _: StreamRecord[CRow]) => -1
+        case (_: StreamRecord[CRow], _: Watermark) => 1
+        case _ => -1
+      }
+    }
+  }
+
+  /**
     * Tuple row key selector that returns a specified field as the selector 
function
     */
   class TupleRowKeySelector[T](

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 192befd..43397ae 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.api.Types
-import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks, TupleRowKeySelector}
 import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, 
RowTimeBoundedStreamInnerJoin}
+import 
org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
-import org.junit.Assert.{assertEquals}
+import org.junit.Assert.assertEquals
 import org.junit.Test
 
 class JoinHarnessTest extends HarnessTestBase {
@@ -75,7 +76,7 @@ class JoinHarnessTest extends HarnessTestBase {
   def testProcTimeJoinWithCommonBounds() {
 
     val joinProcessFunc = new ProcTimeBoundedStreamInnerJoin(
-      -10, 20, 0, rowType, rowType, "TestJoinFunction", funcCode)
+      -10, 20, rowType, rowType, "TestJoinFunction", funcCode)
 
     val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
       new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
@@ -165,7 +166,7 @@ class JoinHarnessTest extends HarnessTestBase {
   def testProcTimeJoinWithNegativeBounds() {
 
     val joinProcessFunc = new ProcTimeBoundedStreamInnerJoin(
-      -10, -5, 0, rowType, rowType, "TestJoinFunction", funcCode)
+      -10, -5, rowType, rowType, "TestJoinFunction", funcCode)
 
     val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] =
       new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc)
@@ -250,7 +251,9 @@ class JoinHarnessTest extends HarnessTestBase {
       -10, 20, 0, rowType, rowType, "TestJoinFunction", funcCode, 0, 0)
 
     val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
-      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
+      new KeyedCoProcessOperatorWithWatermarkDelay[String, CRow, CRow, CRow](
+        joinProcessFunc,
+        joinProcessFunc.getMaxOutputDelay)
     val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, 
CRow, CRow] =
       new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
         operator,
@@ -312,23 +315,31 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(4, testHarness.numKeyedStateEntries())
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    expectedOutput.add(new Watermark(-19))
+    // This result is produced by the late row (1, "k1").
+    expectedOutput.add(new StreamRecord(
+      CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), true), 0))
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), true), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0))
+        CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+        CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+    expectedOutput.add(new Watermark(0))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+        CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0))
+    expectedOutput.add(new Watermark(18))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0))
-
-    // This result is produced by the late row (1, "k1").
-    expectedOutput.add(new StreamRecord(
-      CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), true), 0))
+        CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0))
+    expectedOutput.add(new Watermark(41))
 
     val result = testHarness.getOutput
-    verify(expectedOutput, result, new RowResultSortComparator())
+    println(result)
+    verify(
+      expectedOutput,
+      result,
+      new RowResultSortComparatorWithWatermarks(),
+      checkWaterMark = true)
     testHarness.close()
   }
 
@@ -340,7 +351,9 @@ class JoinHarnessTest extends HarnessTestBase {
       -10, -7, 0, rowType, rowType, "TestJoinFunction", funcCode, 0, 0)
 
     val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] =
-      new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc)
+      new KeyedCoProcessOperatorWithWatermarkDelay[String, CRow, CRow, CRow](
+        joinProcessFunc,
+        joinProcessFunc.getMaxOutputDelay)
     val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, 
CRow, CRow] =
       new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow](
         operator,
@@ -394,13 +407,21 @@ class JoinHarnessTest extends HarnessTestBase {
     assertEquals(0, testHarness.numKeyedStateEntries())
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    expectedOutput.add(new Watermark(-9))
+    expectedOutput.add(new Watermark(-8))
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(3L: JLong, "k1", 13L: JLong, "k1"), true), 0))
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(6L: JLong, "k1", 13L: JLong, "k1"), true), 0))
+    expectedOutput.add(new Watermark(0))
+    expectedOutput.add(new Watermark(8))
 
     val result = testHarness.getOutput
-    verify(expectedOutput, result, new RowResultSortComparator())
+    verify(
+      expectedOutput,
+      result,
+      new RowResultSortComparatorWithWatermarks(),
+      checkWaterMark = true)
     testHarness.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 13bfbcd..015a5a2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -132,51 +132,54 @@ class JoinITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     StreamITCase.clear
-    env.setParallelism(1)
 
     val sqlQuery =
       """
-        |SELECT t2.a, t2.c, t1.c
+        |SELECT t2.key, t2.id, t1.id
         |FROM T1 as t1 join T2 as t2 ON
-        |  t1.a = t2.a AND
+        |  t1.key = t2.key AND
         |  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
         |    t2.rt + INTERVAL '6' SECOND
         |""".stripMargin
 
-    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+    val data1 = new mutable.MutableList[(String, String, Long)]
     // for boundary test
-    data1.+=((1, 999L, "LEFT0.999", 999L))
-    data1.+=((1, 1000L, "LEFT1", 1000L))
-    data1.+=((1, 2000L, "LEFT2", 2000L))
-    data1.+=((1, 3000L, "LEFT3", 3000L))
-    data1.+=((2, 4000L, "LEFT4", 4000L))
-    data1.+=((1, 5000L, "LEFT5", 5000L))
-    data1.+=((1, 6000L, "LEFT6", 6000L))
-
-    val data2 = new mutable.MutableList[(Int, Long, String, Long)]
-    data2.+=((1, 6000L, "RIGHT6", 6000L))
-    data2.+=((2, 7000L, "RIGHT7", 7000L))
+    data1.+=(("A", "LEFT0.999", 999L))
+    data1.+=(("A", "LEFT1", 1000L))
+    data1.+=(("A", "LEFT2", 2000L))
+    data1.+=(("A", "LEFT3", 3000L))
+    data1.+=(("B", "LEFT4", 4000L))
+    data1.+=(("A", "LEFT5", 5000L))
+    data1.+=(("A", "LEFT6", 6000L))
+    // test null key
+    data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "RIGHT6", 6000L))
+    data2.+=(("B", "RIGHT7", 7000L))
+    // test null key
+    data2.+=((null.asInstanceOf[String], "RIGHT10", 10000L))
 
     val t1 = env.fromCollection(data1)
-      .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor)
-      .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
     val t2 = env.fromCollection(data2)
-      .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor)
-      .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
 
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = new java.util.ArrayList[String]
-    expected.add("1,RIGHT6,LEFT1")
-    expected.add("1,RIGHT6,LEFT2")
-    expected.add("1,RIGHT6,LEFT3")
-    expected.add("1,RIGHT6,LEFT5")
-    expected.add("1,RIGHT6,LEFT6")
-    expected.add("2,RIGHT7,LEFT4")
+    expected.add("A,RIGHT6,LEFT1")
+    expected.add("A,RIGHT6,LEFT2")
+    expected.add("A,RIGHT6,LEFT3")
+    expected.add("A,RIGHT6,LEFT5")
+    expected.add("A,RIGHT6,LEFT6")
+    expected.add("B,RIGHT7,LEFT4")
     StreamITCase.compareWithList(expected)
   }
 
@@ -189,9 +192,6 @@ class JoinITCase extends StreamingWithStateTestBase {
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     StreamITCase.clear
 
-    // different parallelisms lead to different join results
-    env.setParallelism(1)
-
     val sqlQuery =
       """
         |SELECT t2.a, t1.c, t2.c
@@ -215,8 +215,6 @@ class JoinITCase extends StreamingWithStateTestBase {
     data1.+=((1, 4L, "LEFT4.9", 4999L))
     data1.+=((1, 4L, "LEFT5", 5000L))
     data1.+=((1, 10L, "LEFT6", 6000L))
-    // a left late row
-    data1.+=((1, 3L, "LEFT3.5", 3500L))
 
     val data2 = new mutable.MutableList[(Int, Long, String, Long)]
     // just for watermark
@@ -224,20 +222,18 @@ class JoinITCase extends StreamingWithStateTestBase {
     data2.+=((1, 9L, "RIGHT6", 6000L))
     data2.+=((2, 14L, "RIGHT7", 7000L))
     data2.+=((1, 4L, "RIGHT8", 8000L))
-    // a right late row
-    data2.+=((1, 10L, "RIGHT5", 5000L))
 
     val t1 = env.fromCollection(data1)
-      .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
     val t2 = env.fromCollection(data2)
-      .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
       .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime)
 
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -247,34 +243,131 @@ class JoinITCase extends StreamingWithStateTestBase {
     expected1+= "1,LEFT1.1,RIGHT6"
     expected1+= "2,LEFT4,RIGHT7"
     expected1+= "1,LEFT4.9,RIGHT6"
-    // produced by the left late rows
-    expected1+= "1,LEFT3.5,RIGHT6"
-    expected1+= "1,LEFT3.5,RIGHT8"
-    // produced by the right late rows
-    expected1+= "1,LEFT3,RIGHT5"
-    expected1+= "1,LEFT3.5,RIGHT5"
 
     val expected2 = new mutable.MutableList[String]
     expected2+= "1,LEFT3,RIGHT6"
     expected2+= "1,LEFT1.1,RIGHT6"
     expected2+= "2,LEFT4,RIGHT7"
     expected2+= "1,LEFT4.9,RIGHT6"
-    // produced by the left late rows
-    expected2+= "1,LEFT3.5,RIGHT6"
-    expected2+= "1,LEFT3.5,RIGHT8"
-    // produced by the right late rows
-    expected2+= "1,LEFT3,RIGHT5"
-    expected2+= "1,LEFT1,RIGHT5"
-    expected2+= "1,LEFT1.1,RIGHT5"
 
     Assert.assertThat(
       StreamITCase.testResults.sorted,
       CoreMatchers.either(CoreMatchers.is(expected1.sorted)).
         or(CoreMatchers.is(expected2.sorted)))
   }
+
+  /** test rowtime inner join with window aggregation **/
+  @Test
+  def testRowTimeInnerJoinWithWindowAggregateOnFirstTime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    StreamITCase.clear
+
+    val sqlQuery =
+      """
+        |SELECT t1.key, TUMBLE_END(t1.rt, INTERVAL '4' SECOND), COUNT(t2.key)
+        |FROM T1 AS t1 join T2 AS t2 ON
+        |  t1.key = t2.key AND
+        |  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+        |    t2.rt + INTERVAL '5' SECOND
+        |GROUP BY TUMBLE(t1.rt, INTERVAL '4' SECOND), t1.key
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(String, String, Long)]
+    data1.+=(("A", "L-1", 1000L))  // no joining record
+    data1.+=(("A", "L-2", 2000L))  // 1 joining record
+    data1.+=(("A", "L-3", 3000L))  // 2 joining records
+    data1.+=(("B", "L-4", 4000L))  // 1 joining record
+    data1.+=(("C", "L-5", 4000L))  // no joining record
+    data1.+=(("A", "L-6", 10000L)) // 2 joining records
+    data1.+=(("A", "L-7", 13000L)) // 1 joining record
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "R-1", 7000L)) // 3 joining records
+    data2.+=(("B", "R-4", 7000L)) // 1 joining records
+    data2.+=(("A", "R-3", 8000L)) // 3 joining records
+    data2.+=(("D", "R-2", 8000L)) // no joining record
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = new java.util.ArrayList[String]
+    expected.add("A,1970-01-01 00:00:04.0,3")
+    expected.add("A,1970-01-01 00:00:12.0,2")
+    expected.add("A,1970-01-01 00:00:16.0,1")
+    expected.add("B,1970-01-01 00:00:08.0,1")
+    StreamITCase.compareWithList(expected)
+  }
+
+  /** test rowtime inner join with window aggregation **/
+  @Test
+  def testRowTimeInnerJoinWithWindowAggregateOnSecondTime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    StreamITCase.clear
+
+    val sqlQuery =
+      """
+        |SELECT t2.key, TUMBLE_END(t2.rt, INTERVAL '4' SECOND), COUNT(t1.key)
+        |FROM T1 AS t1 join T2 AS t2 ON
+        |  t1.key = t2.key AND
+        |  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+        |    t2.rt + INTERVAL '5' SECOND
+        |GROUP BY TUMBLE(t2.rt, INTERVAL '4' SECOND), t2.key
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(String, String, Long)]
+    data1.+=(("A", "L-1", 1000L))  // no joining record
+    data1.+=(("A", "L-2", 2000L))  // 1 joining record
+    data1.+=(("A", "L-3", 3000L))  // 2 joining records
+    data1.+=(("B", "L-4", 4000L))  // 1 joining record
+    data1.+=(("C", "L-5", 4000L))  // no joining record
+    data1.+=(("A", "L-6", 10000L)) // 2 joining records
+    data1.+=(("A", "L-7", 13000L)) // 1 joining record
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "R-1", 7000L)) // 3 joining records
+    data2.+=(("B", "R-4", 7000L)) // 1 joining records
+    data2.+=(("A", "R-3", 8000L)) // 3 joining records
+    data2.+=(("D", "R-2", 8000L)) // no joining record
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = new java.util.ArrayList[String]
+    expected.add("A,1970-01-01 00:00:08.0,3")
+    expected.add("A,1970-01-01 00:00:12.0,3")
+    expected.add("B,1970-01-01 00:00:08.0,1")
+    StreamITCase.compareWithList(expected)
+  }
+
 }
 
-private class Tuple2WatermarkExtractor
+private class Row4WatermarkExtractor
   extends AssignerWithPunctuatedWatermarks[(Int, Long, String, Long)] {
 
   override def checkAndGetNextWatermark(
@@ -289,3 +382,19 @@ private class Tuple2WatermarkExtractor
     element._4
   }
 }
+
+private class Row3WatermarkExtractor2
+  extends AssignerWithPunctuatedWatermarks[(String, String, Long)] {
+
+  override def checkAndGetNextWatermark(
+    lastElement: (String, String, Long),
+    extractedTimestamp: Long): Watermark = {
+    new Watermark(extractedTimestamp - 1)
+  }
+
+  override def extractTimestamp(
+    element: (String, String, Long),
+    previousElementTimestamp: Long): Long = {
+    element._3
+  }
+}

Reply via email to