[FLINK-6233] [table] Add more tests for rowtime window join + minor refactoring.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ea7f49a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ea7f49a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ea7f49a Branch: refs/heads/master Commit: 1ea7f49a5030ae481122d34915ca14d30b8626f5 Parents: 655d8b1 Author: Fabian Hueske <[email protected]> Authored: Tue Oct 10 14:48:24 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Tue Oct 10 23:09:07 2017 +0200 ---------------------------------------------------------------------- .../nodes/datastream/DataStreamWindowJoin.scala | 16 +- .../datastream/DataStreamWindowJoinRule.scala | 17 +- .../join/ProcTimeBoundedStreamInnerJoin.scala | 17 +- .../join/RowTimeBoundedStreamInnerJoin.scala | 18 +- .../join/TimeBoundedStreamInnerJoin.scala | 38 ++-- .../table/runtime/join/WindowJoinUtil.scala | 21 -- .../flink/table/api/stream/sql/JoinTest.scala | 93 ++++++++- .../table/runtime/harness/HarnessTestBase.scala | 20 ++ .../table/runtime/harness/JoinHarnessTest.scala | 53 +++-- .../table/runtime/stream/sql/JoinITCase.scala | 209 ++++++++++++++----- 10 files changed, 368 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala index 9358aa3..3e23006 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala @@ -136,6 +136,11 @@ class DataStreamWindowJoin( remainCondition, ruleDescription) + val joinOpName = + s"where: (" + + s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " + + s"join: (${joinSelectionToString(schema.relDataType)})" + joinType match { case JoinRelType.INNER => if (relativeWindowSize < 0) { @@ -148,6 +153,7 @@ class DataStreamWindowJoin( leftDataStream, rightDataStream, returnTypeInfo, + joinOpName, joinFunction.name, joinFunction.code, leftKeys, @@ -158,6 +164,7 @@ class DataStreamWindowJoin( leftDataStream, rightDataStream, returnTypeInfo, + joinOpName, joinFunction.name, joinFunction.code, leftKeys, @@ -202,6 +209,7 @@ class DataStreamWindowJoin( leftDataStream: DataStream[CRow], rightDataStream: DataStream[CRow], returnTypeInfo: TypeInformation[CRow], + operatorName: String, joinFunctionName: String, joinFunctionCode: String, leftKeys: Array[Int], @@ -210,7 +218,6 @@ class DataStreamWindowJoin( val procInnerJoinFunc = new ProcTimeBoundedStreamInnerJoin( leftLowerBound, leftUpperBound, - allowedLateness = 0L, leftSchema.typeInfo, rightSchema.typeInfo, joinFunctionName, @@ -220,6 +227,7 @@ class DataStreamWindowJoin( leftDataStream.connect(rightDataStream) .keyBy(leftKeys, rightKeys) .process(procInnerJoinFunc) + .name(operatorName) .returns(returnTypeInfo) } else { leftDataStream.connect(rightDataStream) @@ -227,6 +235,7 @@ class DataStreamWindowJoin( .process(procInnerJoinFunc) .setParallelism(1) .setMaxParallelism(1) + .name(operatorName) .returns(returnTypeInfo) } } @@ -235,6 +244,7 @@ class DataStreamWindowJoin( leftDataStream: DataStream[CRow], rightDataStream: DataStream[CRow], returnTypeInfo: TypeInformation[CRow], + operatorName: String, joinFunctionName: String, joinFunctionCode: String, leftKeys: Array[Int], @@ -256,7 +266,7 @@ class DataStreamWindowJoin( .connect(rightDataStream) .keyBy(leftKeys, rightKeys) .transform( - "InnerRowtimeWindowJoin", + operatorName, returnTypeInfo, new KeyedCoProcessOperatorWithWatermarkDelay[Tuple, CRow, CRow, CRow]( rowTimeInnerJoinFunc, @@ -266,7 +276,7 @@ class DataStreamWindowJoin( leftDataStream.connect(rightDataStream) .keyBy(new NullByteKeySelector[CRow](), new NullByteKeySelector[CRow]) .transform( - "InnerRowtimeWindowJoin", + operatorName, returnTypeInfo, new KeyedCoProcessOperatorWithWatermarkDelay[java.lang.Byte, CRow, CRow, CRow]( rowTimeInnerJoinFunc, http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala index d208d2b..a7358c7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala @@ -41,29 +41,22 @@ class DataStreamWindowJoinRule override def matches(call: RelOptRuleCall): Boolean = { val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] - val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate( join.getCondition, join.getLeft.getRowType.getFieldCount, join.getRowType, join.getCluster.getRexBuilder, TableConfig.DEFAULT) - // remaining predicate must not access time attributes - val remainingPredsAccessTime = remainingPreds.isDefined && - WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) - if (windowBounds.isDefined) { if (windowBounds.get.isEventTime) { - !remainingPredsAccessTime + true } else { - // Check that no event-time attributes are in the input. - // The proc-time join implementation does ensure that record timestamp are correctly set. - // It is always the timestamp of the later arriving record. + // Check that no event-time attributes are in the input because the processing time window + // join does not correctly hold back watermarks. // We rely on projection pushdown to remove unused attributes before the join. - val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + !join.getRowType.getFieldList.asScala .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) - - !remainingPredsAccessTime && !rowTimeAttrInOutput } } else { // the given join does not have valid window bounds. We cannot translate it. http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala index ab5a9c3..3bac42c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala @@ -29,19 +29,18 @@ import org.apache.flink.types.Row final class ProcTimeBoundedStreamInnerJoin( leftLowerBound: Long, leftUpperBound: Long, - allowedLateness: Long, leftType: TypeInformation[Row], rightType: TypeInformation[Row], genJoinFuncName: String, genJoinFuncCode: String) - extends TimeBoundedStreamInnerJoin( - leftLowerBound, - leftUpperBound, - allowedLateness, - leftType, - rightType, - genJoinFuncName, - genJoinFuncCode) { + extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode) { override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, CRow]#Context): Unit = { leftOperatorTime = ctx.timerService().currentProcessingTime() http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala index 5cf5a53..a2d9dca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala @@ -36,18 +36,18 @@ final class RowTimeBoundedStreamInnerJoin( genJoinFuncCode: String, leftTimeIdx: Int, rightTimeIdx: Int) - extends TimeBoundedStreamInnerJoin( - leftLowerBound, - leftUpperBound, - allowedLateness, - leftType, - rightType, - genJoinFuncName, - genJoinFuncCode) { + extends TimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness, + leftType, + rightType, + genJoinFuncName, + genJoinFuncCode) { /** * Get the maximum interval between receiving a row and emitting it (as part of a joined result). - * Only reasonable for row time join. + * This is the time interval by which watermarks need to be held back. * * @return the maximum delay for the outputs */ http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala index 7bf3d33..9625eac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala @@ -38,15 +38,16 @@ import org.apache.flink.util.Collector /** * A CoProcessFunction to execute time-bounded stream inner-join. * Two kinds of time criteria: - * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X" where + * X and Y might be negative or positive and X <= Y. * * @param leftLowerBound the lower bound for the left stream (X in the criteria) * @param leftUpperBound the upper bound for the left stream (Y in the criteria) * @param allowedLateness the lateness allowed for the two streams * @param leftType the input type of left stream * @param rightType the input type of right stream - * @param genJoinFuncName the function code of other non-equi conditions - * @param genJoinFuncCode the function name of other non-equi conditions + * @param genJoinFuncName the name of the generated function + * @param genJoinFuncCode the code of function to evaluate the non-window join conditions * */ abstract class TimeBoundedStreamInnerJoin( @@ -57,9 +58,9 @@ abstract class TimeBoundedStreamInnerJoin( private val rightType: TypeInformation[Row], private val genJoinFuncName: String, private val genJoinFuncCode: String) - extends CoProcessFunction[CRow, CRow, CRow] - with Compiler[FlatJoinFunction[Row, Row, Row]] - with Logging { + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { private var cRowWrapper: CRowWrappingCollector = _ @@ -79,15 +80,16 @@ abstract class TimeBoundedStreamInnerJoin( protected val leftRelativeSize: Long = -leftLowerBound protected val rightRelativeSize: Long = leftUpperBound + // Points in time until which the respective cache has been cleaned. private var leftExpirationTime: Long = 0L private var rightExpirationTime: Long = 0L + // Current time on the respective input stream. protected var leftOperatorTime: Long = 0L protected var rightOperatorTime: Long = 0L - - // for delayed cleanup - private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + // Minimum interval by which state is cleaned up + private val minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2 if (allowedLateness < 0) { throw new IllegalArgumentException("The allowed lateness must be non-negative.") @@ -140,12 +142,14 @@ abstract class TimeBoundedStreamInnerJoin( cRowValue: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) val leftRow = cRowValue.row val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow) val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize cRowWrapper.out = out + // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current @@ -164,7 +168,7 @@ abstract class TimeBoundedStreamInnerJoin( } // Check if we need to join the current row against cached rows of the right input. // The condition here should be rightMinimumTime < rightQualifiedUpperBound. - // I use rightExpirationTime as an approximation of the rightMinimumTime here, + // We use rightExpirationTime as an approximation of the rightMinimumTime here, // since rightExpirationTime <= rightMinimumTime is always true. if (rightExpirationTime < rightQualifiedUpperBound) { // Upper bound of current join window has not passed the cache expiration time yet. @@ -199,12 +203,14 @@ abstract class TimeBoundedStreamInnerJoin( cRowValue: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) val rightRow = cRowValue.row val timeForRightRow: Long = getTimeForRightStream(ctx, rightRow) val leftQualifiedLowerBound: Long = timeForRightRow - leftRelativeSize val leftQualifiedUpperBound: Long = timeForRightRow + rightRelativeSize cRowWrapper.out = out + // Check if we need to cache the current row. if (leftOperatorTime < leftQualifiedUpperBound) { // Operator time of left stream has not exceeded the upper window bound of the current @@ -223,7 +229,7 @@ abstract class TimeBoundedStreamInnerJoin( } // Check if we need to join the current row against cached rows of the left input. // The condition here should be leftMinimumTime < leftQualifiedUpperBound. - // I use leftExpirationTime as an approximation of the leftMinimumTime here, + // We use leftExpirationTime as an approximation of the leftMinimumTime here, // since leftExpirationTime <= leftMinimumTime is always true. if (leftExpirationTime < leftQualifiedUpperBound) { leftExpirationTime = calExpirationTime(rightOperatorTime, leftRelativeSize) @@ -261,6 +267,7 @@ abstract class TimeBoundedStreamInnerJoin( timestamp: Long, ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) // In the future, we should separate the left and right watermarks. Otherwise, the // registered timer of the faster stream will be delayed, even if the watermarks have @@ -316,11 +323,11 @@ abstract class TimeBoundedStreamInnerJoin( rowTime: Long, leftRow: Boolean): Unit = { if (leftRow) { - val cleanupTime = rowTime + leftRelativeSize + cleanupDelay + allowedLateness + 1 + val cleanupTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1 registerTimer(ctx, cleanupTime) rightTimerState.update(cleanupTime) } else { - val cleanupTime = rowTime + rightRelativeSize + cleanupDelay + allowedLateness + 1 + val cleanupTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 1 registerTimer(ctx, cleanupTime) leftTimerState.update(cleanupTime) } @@ -361,6 +368,7 @@ abstract class TimeBoundedStreamInnerJoin( } } } + if (earliestTimestamp > 0) { // There are rows left in the cache. Register a timer to expire them later. registerCleanUpTimer( @@ -385,6 +393,8 @@ abstract class TimeBoundedStreamInnerJoin( /** * Return the time for the target row from the left stream. * + * Requires that [[updateOperatorTime()]] has been called before. + * * @param context the runtime context * @param row the target row * @return time for the target row @@ -394,6 +404,8 @@ abstract class TimeBoundedStreamInnerJoin( /** * Return the time for the target row from the right stream. * + * Requires that [[updateOperatorTime()]] has been called before. + * * @param context the runtime context * @param row the target row * @return time for the target row http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 6f97f2a..863f342 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -266,27 +266,6 @@ object WindowJoinUtil { } /** - * Checks if an expression accesses a time attribute. - * - * @param expr The expression to check. - * @param inputType The input type of the expression. - * @return True, if the expression accesses a time attribute. False otherwise. - */ - def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = { - expr match { - case i: RexInputRef => - val accessedType = inputType.getFieldList.get(i.getIndex).getType - accessedType match { - case _: TimeIndicatorRelDataType => true - case _ => false - } - case c: RexCall => - c.operands.asScala.exists(accessesTimeAttribute(_, inputType)) - case _ => false - } - } - - /** * Checks if an expression accesses a non-time attribute. * * @param expr The expression to check. http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index a4234c5..53aff82 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -20,8 +20,9 @@ package org.apache.flink.table.api.stream.sql import org.apache.calcite.rel.logical.LogicalJoin import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil -import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.TableTestUtil.{term, _} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Assert._ import org.junit.Test @@ -184,6 +185,96 @@ class JoinTest extends TableTestBase { } @Test + def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = { + + val sqlQuery = + """ + |SELECT t1.b, SUM(t2.a) AS aSum, COUNT(t2.b) AS bCnt + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.c BETWEEN t2.c - INTERVAL '10' MINUTE AND t2.c + INTERVAL '1' HOUR + |GROUP BY TUMBLE(t1.c, INTERVAL '6' HOUR), t1.b + |""".stripMargin + + val expected = + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "c") + ), + term("where", + "AND(=(a, a0), >=(c, -(c0, 600000)), " + + "<=(c, DATETIME_PLUS(c0, 3600000)))"), + term("join", "a, b, c, a0, b0, c0"), + term("joinType", "InnerJoin") + ), + term("select", "c", "b", "a0", "b0") + ), + term("groupBy", "b"), + term("window", TumblingGroupWindow('w$, 'c, 21600000.millis)), + term("select", "b", "SUM(a0) AS aSum", "COUNT(b0) AS bCnt") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testRowTimeInnerJoinAndWindowAggregationOnSecond(): Unit = { + + val sqlQuery = + """ + |SELECT t2.b, SUM(t1.a) AS aSum, COUNT(t1.b) AS bCnt + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.c BETWEEN t2.c - INTERVAL '10' MINUTE AND t2.c + INTERVAL '1' HOUR + |GROUP BY TUMBLE(t2.c, INTERVAL '6' HOUR), t2.b + |""".stripMargin + + val expected = + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "c") + ), + term("where", + "AND(=(a, a0), >=(c, -(c0, 600000)), " + + "<=(c, DATETIME_PLUS(c0, 3600000)))"), + term("join", "a, b, c, a0, b0, c0"), + term("joinType", "InnerJoin") + ), + term("select", "c0", "b0", "a", "b") + ), + term("groupBy", "b0"), + term("window", TumblingGroupWindow('w$, 'c0, 21600000.millis)), + term("select", "b0", "SUM(a) AS aSum", "COUNT(b) AS bCnt") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test def testJoinTimeBoundary(): Unit = { verifyTimeBoundary( "t1.proctime between t2.proctime - interval '1' hour " + http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala index 67164b7..942846c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -355,6 +355,26 @@ object HarnessTestBase { } /** + * Return 0 for equal Rows and non zero for different rows + */ + class RowResultSortComparatorWithWatermarks() + extends Comparator[Object] with Serializable { + + override def compare(o1: Object, o2: Object): Int = { + + (o1, o2) match { + case (w1: Watermark, w2: Watermark) => + w1.getTimestamp.compareTo(w2.getTimestamp) + case (r1: StreamRecord[CRow], r2: StreamRecord[CRow]) => + r1.getValue.toString.compareTo(r2.getValue.toString) + case (_: Watermark, _: StreamRecord[CRow]) => -1 + case (_: StreamRecord[CRow], _: Watermark) => 1 + case _ => -1 + } + } + } + + /** * Tuple row key selector that returns a specified field as the selector function */ class TupleRowKeySelector[T]( http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala index 192befd..43397ae 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala @@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness import org.apache.flink.table.api.Types -import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector} +import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TupleRowKeySelector} import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin} +import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row -import org.junit.Assert.{assertEquals} +import org.junit.Assert.assertEquals import org.junit.Test class JoinHarnessTest extends HarnessTestBase { @@ -75,7 +76,7 @@ class JoinHarnessTest extends HarnessTestBase { def testProcTimeJoinWithCommonBounds() { val joinProcessFunc = new ProcTimeBoundedStreamInnerJoin( - -10, 20, 0, rowType, rowType, "TestJoinFunction", funcCode) + -10, 20, rowType, rowType, "TestJoinFunction", funcCode) val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] = new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc) @@ -165,7 +166,7 @@ class JoinHarnessTest extends HarnessTestBase { def testProcTimeJoinWithNegativeBounds() { val joinProcessFunc = new ProcTimeBoundedStreamInnerJoin( - -10, -5, 0, rowType, rowType, "TestJoinFunction", funcCode) + -10, -5, rowType, rowType, "TestJoinFunction", funcCode) val operator: KeyedCoProcessOperator[Integer, CRow, CRow, CRow] = new KeyedCoProcessOperator[Integer, CRow, CRow, CRow](joinProcessFunc) @@ -250,7 +251,9 @@ class JoinHarnessTest extends HarnessTestBase { -10, 20, 0, rowType, rowType, "TestJoinFunction", funcCode, 0, 0) val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = - new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) + new KeyedCoProcessOperatorWithWatermarkDelay[String, CRow, CRow, CRow]( + joinProcessFunc, + joinProcessFunc.getMaxOutputDelay) val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( operator, @@ -312,23 +315,31 @@ class JoinHarnessTest extends HarnessTestBase { assertEquals(4, testHarness.numKeyedStateEntries()) val expectedOutput = new ConcurrentLinkedQueue[Object]() + expectedOutput.add(new Watermark(-19)) + // This result is produced by the late row (1, "k1"). + expectedOutput.add(new StreamRecord( + CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), true), 0)) expectedOutput.add(new StreamRecord( CRow(Row.of(2L: JLong, "k1", 2L: JLong, "k1"), true), 0)) expectedOutput.add(new StreamRecord( - CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0)) + CRow(Row.of(5L: JLong, "k1", 2L: JLong, "k1"), true), 0)) expectedOutput.add(new StreamRecord( - CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0)) + CRow(Row.of(5L: JLong, "k1", 15L: JLong, "k1"), true), 0)) + expectedOutput.add(new Watermark(0)) expectedOutput.add(new StreamRecord( - CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0)) + CRow(Row.of(35L: JLong, "k1", 15L: JLong, "k1"), true), 0)) + expectedOutput.add(new Watermark(18)) expectedOutput.add(new StreamRecord( - CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0)) - - // This result is produced by the late row (1, "k1"). - expectedOutput.add(new StreamRecord( - CRow(Row.of(1L: JLong, "k1", 2L: JLong, "k1"), true), 0)) + CRow(Row.of(40L: JLong, "k2", 39L: JLong, "k2"), true), 0)) + expectedOutput.add(new Watermark(41)) val result = testHarness.getOutput - verify(expectedOutput, result, new RowResultSortComparator()) + println(result) + verify( + expectedOutput, + result, + new RowResultSortComparatorWithWatermarks(), + checkWaterMark = true) testHarness.close() } @@ -340,7 +351,9 @@ class JoinHarnessTest extends HarnessTestBase { -10, -7, 0, rowType, rowType, "TestJoinFunction", funcCode, 0, 0) val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = - new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) + new KeyedCoProcessOperatorWithWatermarkDelay[String, CRow, CRow, CRow]( + joinProcessFunc, + joinProcessFunc.getMaxOutputDelay) val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( operator, @@ -394,13 +407,21 @@ class JoinHarnessTest extends HarnessTestBase { assertEquals(0, testHarness.numKeyedStateEntries()) val expectedOutput = new ConcurrentLinkedQueue[Object]() + expectedOutput.add(new Watermark(-9)) + expectedOutput.add(new Watermark(-8)) expectedOutput.add(new StreamRecord( CRow(Row.of(3L: JLong, "k1", 13L: JLong, "k1"), true), 0)) expectedOutput.add(new StreamRecord( CRow(Row.of(6L: JLong, "k1", 13L: JLong, "k1"), true), 0)) + expectedOutput.add(new Watermark(0)) + expectedOutput.add(new Watermark(8)) val result = testHarness.getOutput - verify(expectedOutput, result, new RowResultSortComparator()) + verify( + expectedOutput, + result, + new RowResultSortComparatorWithWatermarks(), + checkWaterMark = true) testHarness.close() } } http://git-wip-us.apache.org/repos/asf/flink/blob/1ea7f49a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 13bfbcd..015a5a2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -132,51 +132,54 @@ class JoinITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) StreamITCase.clear - env.setParallelism(1) val sqlQuery = """ - |SELECT t2.a, t2.c, t1.c + |SELECT t2.key, t2.id, t1.id |FROM T1 as t1 join T2 as t2 ON - | t1.a = t2.a AND + | t1.key = t2.key AND | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND | t2.rt + INTERVAL '6' SECOND |""".stripMargin - val data1 = new mutable.MutableList[(Int, Long, String, Long)] + val data1 = new mutable.MutableList[(String, String, Long)] // for boundary test - data1.+=((1, 999L, "LEFT0.999", 999L)) - data1.+=((1, 1000L, "LEFT1", 1000L)) - data1.+=((1, 2000L, "LEFT2", 2000L)) - data1.+=((1, 3000L, "LEFT3", 3000L)) - data1.+=((2, 4000L, "LEFT4", 4000L)) - data1.+=((1, 5000L, "LEFT5", 5000L)) - data1.+=((1, 6000L, "LEFT6", 6000L)) - - val data2 = new mutable.MutableList[(Int, Long, String, Long)] - data2.+=((1, 6000L, "RIGHT6", 6000L)) - data2.+=((2, 7000L, "RIGHT7", 7000L)) + data1.+=(("A", "LEFT0.999", 999L)) + data1.+=(("A", "LEFT1", 1000L)) + data1.+=(("A", "LEFT2", 2000L)) + data1.+=(("A", "LEFT3", 3000L)) + data1.+=(("B", "LEFT4", 4000L)) + data1.+=(("A", "LEFT5", 5000L)) + data1.+=(("A", "LEFT6", 6000L)) + // test null key + data1.+=((null.asInstanceOf[String], "LEFT8", 8000L)) + + val data2 = new mutable.MutableList[(String, String, Long)] + data2.+=(("A", "RIGHT6", 6000L)) + data2.+=(("B", "RIGHT7", 7000L)) + // test null key + data2.+=((null.asInstanceOf[String], "RIGHT10", 10000L)) val t1 = env.fromCollection(data1) - .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor) - .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) val t2 = env.fromCollection(data2) - .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor) - .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = new java.util.ArrayList[String] - expected.add("1,RIGHT6,LEFT1") - expected.add("1,RIGHT6,LEFT2") - expected.add("1,RIGHT6,LEFT3") - expected.add("1,RIGHT6,LEFT5") - expected.add("1,RIGHT6,LEFT6") - expected.add("2,RIGHT7,LEFT4") + expected.add("A,RIGHT6,LEFT1") + expected.add("A,RIGHT6,LEFT2") + expected.add("A,RIGHT6,LEFT3") + expected.add("A,RIGHT6,LEFT5") + expected.add("A,RIGHT6,LEFT6") + expected.add("B,RIGHT7,LEFT4") StreamITCase.compareWithList(expected) } @@ -189,9 +192,6 @@ class JoinITCase extends StreamingWithStateTestBase { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) StreamITCase.clear - // different parallelisms lead to different join results - env.setParallelism(1) - val sqlQuery = """ |SELECT t2.a, t1.c, t2.c @@ -215,8 +215,6 @@ class JoinITCase extends StreamingWithStateTestBase { data1.+=((1, 4L, "LEFT4.9", 4999L)) data1.+=((1, 4L, "LEFT5", 5000L)) data1.+=((1, 10L, "LEFT6", 6000L)) - // a left late row - data1.+=((1, 3L, "LEFT3.5", 3500L)) val data2 = new mutable.MutableList[(Int, Long, String, Long)] // just for watermark @@ -224,20 +222,18 @@ class JoinITCase extends StreamingWithStateTestBase { data2.+=((1, 9L, "RIGHT6", 6000L)) data2.+=((2, 14L, "RIGHT7", 7000L)) data2.+=((1, 4L, "RIGHT8", 8000L)) - // a right late row - data2.+=((1, 10L, "RIGHT5", 5000L)) val t1 = env.fromCollection(data1) - .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor) + .assignTimestampsAndWatermarks(new Row4WatermarkExtractor) .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) val t2 = env.fromCollection(data2) - .assignTimestampsAndWatermarks(new Tuple2WatermarkExtractor) + .assignTimestampsAndWatermarks(new Row4WatermarkExtractor) .toTable(tEnv, 'a, 'b, 'c, 'rt.rowtime) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -247,34 +243,131 @@ class JoinITCase extends StreamingWithStateTestBase { expected1+= "1,LEFT1.1,RIGHT6" expected1+= "2,LEFT4,RIGHT7" expected1+= "1,LEFT4.9,RIGHT6" - // produced by the left late rows - expected1+= "1,LEFT3.5,RIGHT6" - expected1+= "1,LEFT3.5,RIGHT8" - // produced by the right late rows - expected1+= "1,LEFT3,RIGHT5" - expected1+= "1,LEFT3.5,RIGHT5" val expected2 = new mutable.MutableList[String] expected2+= "1,LEFT3,RIGHT6" expected2+= "1,LEFT1.1,RIGHT6" expected2+= "2,LEFT4,RIGHT7" expected2+= "1,LEFT4.9,RIGHT6" - // produced by the left late rows - expected2+= "1,LEFT3.5,RIGHT6" - expected2+= "1,LEFT3.5,RIGHT8" - // produced by the right late rows - expected2+= "1,LEFT3,RIGHT5" - expected2+= "1,LEFT1,RIGHT5" - expected2+= "1,LEFT1.1,RIGHT5" Assert.assertThat( StreamITCase.testResults.sorted, CoreMatchers.either(CoreMatchers.is(expected1.sorted)). or(CoreMatchers.is(expected2.sorted))) } + + /** test rowtime inner join with window aggregation **/ + @Test + def testRowTimeInnerJoinWithWindowAggregateOnFirstTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t1.key, TUMBLE_END(t1.rt, INTERVAL '4' SECOND), COUNT(t2.key) + |FROM T1 AS t1 join T2 AS t2 ON + | t1.key = t2.key AND + | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND + | t2.rt + INTERVAL '5' SECOND + |GROUP BY TUMBLE(t1.rt, INTERVAL '4' SECOND), t1.key + |""".stripMargin + + val data1 = new mutable.MutableList[(String, String, Long)] + data1.+=(("A", "L-1", 1000L)) // no joining record + data1.+=(("A", "L-2", 2000L)) // 1 joining record + data1.+=(("A", "L-3", 3000L)) // 2 joining records + data1.+=(("B", "L-4", 4000L)) // 1 joining record + data1.+=(("C", "L-5", 4000L)) // no joining record + data1.+=(("A", "L-6", 10000L)) // 2 joining records + data1.+=(("A", "L-7", 13000L)) // 1 joining record + + val data2 = new mutable.MutableList[(String, String, Long)] + data2.+=(("A", "R-1", 7000L)) // 3 joining records + data2.+=(("B", "R-4", 7000L)) // 1 joining records + data2.+=(("A", "R-3", 8000L)) // 3 joining records + data2.+=(("D", "R-2", 8000L)) // no joining record + + val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + val expected = new java.util.ArrayList[String] + expected.add("A,1970-01-01 00:00:04.0,3") + expected.add("A,1970-01-01 00:00:12.0,2") + expected.add("A,1970-01-01 00:00:16.0,1") + expected.add("B,1970-01-01 00:00:08.0,1") + StreamITCase.compareWithList(expected) + } + + /** test rowtime inner join with window aggregation **/ + @Test + def testRowTimeInnerJoinWithWindowAggregateOnSecondTime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t2.key, TUMBLE_END(t2.rt, INTERVAL '4' SECOND), COUNT(t1.key) + |FROM T1 AS t1 join T2 AS t2 ON + | t1.key = t2.key AND + | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND + | t2.rt + INTERVAL '5' SECOND + |GROUP BY TUMBLE(t2.rt, INTERVAL '4' SECOND), t2.key + |""".stripMargin + + val data1 = new mutable.MutableList[(String, String, Long)] + data1.+=(("A", "L-1", 1000L)) // no joining record + data1.+=(("A", "L-2", 2000L)) // 1 joining record + data1.+=(("A", "L-3", 3000L)) // 2 joining records + data1.+=(("B", "L-4", 4000L)) // 1 joining record + data1.+=(("C", "L-5", 4000L)) // no joining record + data1.+=(("A", "L-6", 10000L)) // 2 joining records + data1.+=(("A", "L-7", 13000L)) // 1 joining record + + val data2 = new mutable.MutableList[(String, String, Long)] + data2.+=(("A", "R-1", 7000L)) // 3 joining records + data2.+=(("B", "R-4", 7000L)) // 1 joining records + data2.+=(("A", "R-3", 8000L)) // 3 joining records + data2.+=(("D", "R-2", 8000L)) // no joining record + + val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + val expected = new java.util.ArrayList[String] + expected.add("A,1970-01-01 00:00:08.0,3") + expected.add("A,1970-01-01 00:00:12.0,3") + expected.add("B,1970-01-01 00:00:08.0,1") + StreamITCase.compareWithList(expected) + } + } -private class Tuple2WatermarkExtractor +private class Row4WatermarkExtractor extends AssignerWithPunctuatedWatermarks[(Int, Long, String, Long)] { override def checkAndGetNextWatermark( @@ -289,3 +382,19 @@ private class Tuple2WatermarkExtractor element._4 } } + +private class Row3WatermarkExtractor2 + extends AssignerWithPunctuatedWatermarks[(String, String, Long)] { + + override def checkAndGetNextWatermark( + lastElement: (String, String, Long), + extractedTimestamp: Long): Watermark = { + new Watermark(extractedTimestamp - 1) + } + + override def extractTimestamp( + element: (String, String, Long), + previousElementTimestamp: Long): Long = { + element._3 + } +}
