http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala deleted file mode 100644 index 15e8b89..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinTest.scala +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.scala.stream.sql - -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableException -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.join.WindowJoinUtil -import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} -import org.junit.Assert._ -import org.junit.Test - -class JoinTest extends TableTestBase { - private val streamUtil: StreamTableTestUtil = streamTestUtil() - streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime) - streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime) - - @Test - def testProcessingTimeInnerJoin() = { - - val sqlQuery = "SELECT t1.a, t2.b " + - "FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + - "t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour" - val expected = - unaryNode( - "DataStreamCalc", - binaryNode( - "DataStreamWindowJoin", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "proctime") - ), - unaryNode( - "DataStreamCalc", - streamTableNode(1), - term("select", "a", "b", "proctime") - ), - term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"), - term("join", "a, proctime, a0, b, proctime0"), - term("joinType", "InnerJoin") - ), - term("select", "a", "b") - ) - - streamUtil.verifySql(sqlQuery, expected) - } - - /** There should exist time conditions **/ - @Test(expected = classOf[TableException]) - def testWindowJoinUnExistTimeCondition() = { - val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" - streamUtil.verifySql(sql, "n/a") - } - - /** There should exist exactly two time conditions **/ - @Test(expected = classOf[TableException]) - def testWindowJoinSingleTimeCondition() = { - val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" + - " and t1.proctime > t2.proctime - interval '5' second" - streamUtil.verifySql(sql, "n/a") - } - - /** Both time attributes in a join condition must be of the same type **/ - @Test(expected = classOf[TableException]) - def testWindowJoinDiffTimeIndicator() = { - val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" + - " and t1.proctime > t2.proctime - interval '5' second " + - " and t1.proctime < t2.c + interval '5' second" - streamUtil.verifySql(sql, "n/a") - } - - /** The time conditions should be an And condition **/ - @Test(expected = classOf[TableException]) - def testWindowJoinNotCnfCondition() = { - val sql = "SELECT t2.a from MyTable as t1 join MyTable2 as t2 on t1.a = t2.a" + - " and (t1.proctime > t2.proctime - interval '5' second " + - " or t1.proctime < t2.c + interval '5' second)" - streamUtil.verifySql(sql, "n/a") - } - - @Test - def testJoinTimeBoundary(): Unit = { - verifyTimeBoundary( - "t1.proctime between t2.proctime - interval '1' hour " + - "and t2.proctime + interval '1' hour", - -3600000, - 3600000, - "proctime") - - verifyTimeBoundary( - "t1.proctime > t2.proctime - interval '1' second and " + - "t1.proctime < t2.proctime + interval '1' second", - -999, - 999, - "proctime") - - verifyTimeBoundary( - "t1.c >= t2.c - interval '1' second and " + - "t1.c <= t2.c + interval '1' second", - -1000, - 1000, - "rowtime") - - verifyTimeBoundary( - "t1.c >= t2.c and " + - "t1.c <= t2.c + interval '1' second", - 0, - 1000, - "rowtime") - - verifyTimeBoundary( - "t1.c >= t2.c + interval '1' second and " + - "t1.c <= t2.c + interval '10' second", - 1000, - 10000, - "rowtime") - - verifyTimeBoundary( - "t2.c - interval '1' second <= t1.c and " + - "t2.c + interval '10' second >= t1.c", - -1000, - 10000, - "rowtime") - - verifyTimeBoundary( - "t1.c - interval '2' second >= t2.c + interval '1' second -" + - "interval '10' second and " + - "t1.c <= t2.c + interval '10' second", - -7000, - 10000, - "rowtime") - - verifyTimeBoundary( - "t1.c >= t2.c - interval '10' second and " + - "t1.c <= t2.c - interval '5' second", - -10000, - -5000, - "rowtime") - } - - @Test - def testJoinRemainConditionConvert(): Unit = { - streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime) - streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime) - val query = - "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " + - "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " + - "t1.c > t2.c" - verifyRemainConditionConvert( - query, - ">($1, $3)") - - val query1 = - "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " + - "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second " - verifyRemainConditionConvert( - query1, - "") - - streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime) - streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime) - val query2 = - "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " + - "t1.proctime >= t2.proctime - interval '10' second " + - "and t1.proctime <= t2.proctime - interval '5' second and " + - "t1.c > t2.c" - verifyRemainConditionConvert( - query2, - ">($2, $5)") - } - - def verifyTimeBoundary( - timeSql: String, - expLeftSize: Long, - expRightSize: Long, - expTimeType: String) = { - val query = - "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql - - val resultTable = streamUtil.tEnv.sql(query) - val relNode = resultTable.getRelNode - val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] - val rexNode = joinNode.getCondition - val (isRowTime, lowerBound, upperBound, conditionWithoutTime) = - WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType), - joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig) - - val timeTypeStr = - if (isRowTime) "rowtime" - else "proctime" - assertEquals(expLeftSize, lowerBound) - assertEquals(expRightSize, upperBound) - assertEquals(expTimeType, timeTypeStr) - } - - def verifyRemainConditionConvert( - query: String, - expectCondStr: String) = { - - val resultTable = streamUtil.tEnv.sql(query) - val relNode = resultTable.getRelNode - val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] - val joinInfo = joinNode.analyzeCondition - val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) - val (isRowTime, lowerBound, upperBound, remainCondition) = - WindowJoinUtil.analyzeTimeBoundary(rexNode, 4, new RowSchema(joinNode.getRowType), - joinNode.getCluster.getRexBuilder, streamUtil.tEnv.getConfig) - - val actual: String = remainCondition.getOrElse("").toString - - assertEquals(expectCondStr, actual) - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala new file mode 100644 index 0000000..640fd26 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.stream.sql + +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.join.WindowJoinUtil +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Assert._ +import org.junit.Test + +class JoinTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime) + streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime) + + @Test + def testProcessingTimeInnerJoinWithOnClause() = { + + val sqlQuery = + """ + |SELECT t1.a, t2.b + |FROM MyTable t1 JOIN MyTable2 t2 ON + | t1.a = t2.a AND + | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR + |""".stripMargin + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "proctime") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "proctime") + ), + term("where", + "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + + "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"), + term("join", "a, proctime, a0, b, proctime0"), + term("joinType", "InnerJoin") + ), + term("select", "a", "b") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testProcessingTimeInnerJoinWithWhereClause() = { + + val sqlQuery = + """ + |SELECT t1.a, t2.b + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR + |""".stripMargin + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamWindowJoin", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "proctime") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "proctime") + ), + term("where", + "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + + "<=(proctime, DATETIME_PLUS(proctime0, 3600000)))"), + term("join", "a, proctime, a0, b, proctime0"), + term("joinType", "InnerJoin") + ), + term("select", "a", "b0 AS b") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testJoinTimeBoundary(): Unit = { + verifyTimeBoundary( + "t1.proctime between t2.proctime - interval '1' hour " + + "and t2.proctime + interval '1' hour", + -3600000, + 3600000, + "proctime") + + verifyTimeBoundary( + "t1.proctime > t2.proctime - interval '1' second and " + + "t1.proctime < t2.proctime + interval '1' second", + -999, + 999, + "proctime") + + verifyTimeBoundary( + "t1.c >= t2.c - interval '1' second and " + + "t1.c <= t2.c + interval '1' second", + -1000, + 1000, + "rowtime") + + verifyTimeBoundary( + "t1.c >= t2.c and " + + "t1.c <= t2.c + interval '1' second", + 0, + 1000, + "rowtime") + + verifyTimeBoundary( + "t1.c >= t2.c + interval '1' second and " + + "t1.c <= t2.c + interval '10' second", + 1000, + 10000, + "rowtime") + + verifyTimeBoundary( + "t2.c - interval '1' second <= t1.c and " + + "t2.c + interval '10' second >= t1.c", + -1000, + 10000, + "rowtime") + + verifyTimeBoundary( + "t1.c - interval '2' second >= t2.c + interval '1' second -" + + "interval '10' second and " + + "t1.c <= t2.c + interval '10' second", + -7000, + 10000, + "rowtime") + + verifyTimeBoundary( + "t1.c >= t2.c - interval '10' second and " + + "t1.c <= t2.c - interval '5' second", + -10000, + -5000, + "rowtime") + } + + @Test + def testJoinRemainConditionConvert(): Unit = { + streamUtil.addTable[(Int, Long, Int)]("MyTable3", 'a, 'b.rowtime, 'c, 'proctime.proctime) + streamUtil.addTable[(Int, Long, Int)]("MyTable4", 'a, 'b.rowtime, 'c, 'proctime.proctime) + val query = + "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " + + "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second and " + + "t1.c > t2.c" + verifyRemainConditionConvert( + query, + ">($2, $6)") + + val query1 = + "SELECT t1.a, t2.c FROM MyTable3 as t1 join MyTable4 as t2 on t1.a = t2.a and " + + "t1.b >= t2.b - interval '10' second and t1.b <= t2.b - interval '5' second " + verifyRemainConditionConvert( + query1, + "") + + streamUtil.addTable[(Int, Long, Int)]("MyTable5", 'a, 'b, 'c, 'proctime.proctime) + streamUtil.addTable[(Int, Long, Int)]("MyTable6", 'a, 'b, 'c, 'proctime.proctime) + val query2 = + "SELECT t1.a, t2.c FROM MyTable5 as t1 join MyTable6 as t2 on t1.a = t2.a and " + + "t1.proctime >= t2.proctime - interval '10' second " + + "and t1.proctime <= t2.proctime - interval '5' second and " + + "t1.c > t2.c" + verifyRemainConditionConvert( + query2, + ">($2, $6)") + } + + private def verifyTimeBoundary( + timeSql: String, + expLeftSize: Long, + expRightSize: Long, + expTimeType: String): Unit = { + val query = + "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql + + val resultTable = streamUtil.tableEnv.sql(query) + val relNode = resultTable.getRelNode + val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] + val rexNode = joinNode.getCondition + val (windowBounds, _) = + WindowJoinUtil.extractWindowBoundsFromPredicate( + rexNode, + 4, + joinNode.getRowType, + joinNode.getCluster.getRexBuilder, + streamUtil.tableEnv.getConfig) + + val timeTypeStr = + if (windowBounds.get.isEventTime) "rowtime" + else "proctime" + assertEquals(expLeftSize, windowBounds.get.leftLowerBound) + assertEquals(expRightSize, windowBounds.get.leftUpperBound) + assertEquals(expTimeType, timeTypeStr) + } + + private def verifyRemainConditionConvert( + query: String, + expectCondStr: String): Unit = { + + val resultTable = streamUtil.tableEnv.sql(query) + val relNode = resultTable.getRelNode + val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] + val joinInfo = joinNode.analyzeCondition + val rexNode = joinInfo.getRemaining(joinNode.getCluster.getRexBuilder) + val (_, remainCondition) = + WindowJoinUtil.extractWindowBoundsFromPredicate( + rexNode, + 4, + joinNode.getRowType, + joinNode.getCluster.getRexBuilder, + streamUtil.tableEnv.getConfig) + + val actual: String = remainCondition.getOrElse("").toString + + assertEquals(expectCondStr, actual) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala new file mode 100644 index 0000000..9cce37e --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class JoinValidationTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime) + streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime) + + /** There should exist time conditions **/ + @Test(expected = classOf[TableException]) + def testWindowJoinUnExistTimeCondition() = { + val sql = + """ + |SELECT t2.a + |FROM MyTable t1 JOIN MyTable2 t2 ON t1.a = t2.a""".stripMargin + streamUtil.verifySql(sql, "n/a") + } + + /** There should exist exactly two time conditions **/ + @Test(expected = classOf[TableException]) + def testWindowJoinSingleTimeCondition() = { + val sql = + """ + |SELECT t2.a + |FROM MyTable t1 JOIN MyTable2 t2 ON + | t1.a = t2.a AND + | t1.proctime > t2.proctime - INTERVAL '5' SECOND""".stripMargin + streamUtil.verifySql(sql, "n/a") + } + + /** Both time attributes in a join condition must be of the same type **/ + @Test(expected = classOf[TableException]) + def testWindowJoinDiffTimeIndicator() = { + val sql = + """ + |SELECT t2.a FROM + |MyTable t1 JOIN MyTable2 t2 ON + | t1.a = t2.a AND + | t1.proctime > t2.proctime - INTERVAL '5' SECOND AND + | t1.proctime < t2.c + INTERVAL '5' SECOND""".stripMargin + streamUtil.verifySql(sql, "n/a") + } + + /** The time conditions should be an And condition **/ + @Test(expected = classOf[TableException]) + def testWindowJoinNotCnfCondition() = { + val sql = + """ + |SELECT t2.a + |FROM MyTable t1 JOIN MyTable2 t2 ON + | t1.a = t2.a AND + | (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR + | t1.proctime < t2.c + INTERVAL '5' SECOND)""".stripMargin + streamUtil.verifySql(sql, "n/a") + } + + /** Validates that no rowtime attribute is in the output schema **/ + @Test(expected = classOf[TableException]) + def testNoRowtimeAttributeInResult(): Unit = { + val sql = + """ + |SELECT * + |FROM MyTable t1, MyTable2 t2 + |WHERE t1.a = t2.a AND + | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime + | """.stripMargin + + streamUtil.verifySql(sql, "n/a") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 7885160..90c8ea4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase { "DataStreamCalc", streamTableNode(0), term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), - term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + term("where", ">(rowtime, 1990-12-02 12:11:11)") ) util.verifyTable(result, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala index c008ed3..6c24c5d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala @@ -20,20 +20,18 @@ package org.apache.flink.table.runtime.harness import java.util.concurrent.ConcurrentLinkedQueue import java.lang.{Integer => JInt} -import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator import org.apache.flink.streaming.runtime.streamrecord.StreamRecord -import org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, TwoInputStreamOperatorTestHarness} -import org.apache.flink.table.codegen.GeneratedFunction +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, TupleRowKeySelector} import org.apache.flink.table.runtime.join.ProcTimeWindowInnerJoin import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.junit.Test - +import org.junit.Assert.{assertEquals, assertTrue} class JoinHarnessTest extends HarnessTestBase{ @@ -89,7 +87,7 @@ class JoinHarnessTest extends HarnessTestBase{ new TupleRowKeySelector[Integer](0), new TupleRowKeySelector[Integer](0), BasicTypeInfo.INT_TYPE_INFO, - 1,1,0) + 1, 1, 0) testHarness.open() @@ -97,16 +95,16 @@ class JoinHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1) testHarness.processElement1(new StreamRecord( CRow(Row.of(1: JInt, "aaa"), true), 1)) - assert(testHarness.numProcessingTimeTimers() == 1) + assertEquals(1, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(2) testHarness.processElement1(new StreamRecord( CRow(Row.of(2: JInt, "bbb"), true), 2)) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(3) testHarness.processElement1(new StreamRecord( CRow(Row.of(1: JInt, "aaa2"), true), 3)) - assert(testHarness.numKeyedStateEntries() == 4) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) // right stream input and output normally testHarness.processElement2(new StreamRecord( @@ -114,20 +112,20 @@ class JoinHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(4) testHarness.processElement2(new StreamRecord( CRow(Row.of(2: JInt, "Hello1"), true), 4)) - assert(testHarness.numKeyedStateEntries() == 8) - assert(testHarness.numProcessingTimeTimers() == 4) + assertEquals(8, testHarness.numKeyedStateEntries()) + assertEquals(4, testHarness.numProcessingTimeTimers()) // expired left stream record at timestamp 1 testHarness.setProcessingTime(12) - assert(testHarness.numKeyedStateEntries() == 8) - assert(testHarness.numProcessingTimeTimers() == 4) + assertEquals(8, testHarness.numKeyedStateEntries()) + assertEquals(4, testHarness.numProcessingTimeTimers()) testHarness.processElement2(new StreamRecord( CRow(Row.of(1: JInt, "Hi2"), true), 12)) // expired right stream record at timestamp 4 and all left stream testHarness.setProcessingTime(25) - assert(testHarness.numKeyedStateEntries() == 2) - assert(testHarness.numProcessingTimeTimers() == 1) + assertEquals(2, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) testHarness.processElement1(new StreamRecord( CRow(Row.of(1: JInt, "aaa3"), true), 25)) testHarness.processElement1(new StreamRecord( @@ -136,9 +134,9 @@ class JoinHarnessTest extends HarnessTestBase{ CRow(Row.of(2: JInt, "Hello2"), true), 25)) testHarness.setProcessingTime(45) - assert(testHarness.numKeyedStateEntries() > 0) + assertTrue(testHarness.numKeyedStateEntries() > 0) testHarness.setProcessingTime(46) - assert(testHarness.numKeyedStateEntries() == 0) + assertEquals(0, testHarness.numKeyedStateEntries()) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -175,7 +173,7 @@ class JoinHarnessTest extends HarnessTestBase{ new TupleRowKeySelector[Integer](0), new TupleRowKeySelector[Integer](0), BasicTypeInfo.INT_TYPE_INFO, - 1,1,0) + 1, 1, 0) testHarness.open() @@ -188,37 +186,38 @@ class JoinHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(3) testHarness.processElement1(new StreamRecord( CRow(Row.of(1: JInt, "aaa3"), true), 3)) - assert(testHarness.numKeyedStateEntries() == 4) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) // Do not store b elements // not meet a.proctime <= b.proctime - 5 testHarness.processElement2(new StreamRecord( CRow(Row.of(1: JInt, "bbb3"), true), 3)) - assert(testHarness.numKeyedStateEntries() == 4) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) // meet a.proctime <= b.proctime - 5 testHarness.setProcessingTime(7) testHarness.processElement2(new StreamRecord( CRow(Row.of(2: JInt, "bbb7"), true), 7)) - assert(testHarness.numKeyedStateEntries() == 4) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) // expire record of stream a at timestamp 1 testHarness.setProcessingTime(12) - assert(testHarness.numKeyedStateEntries() == 4) - assert(testHarness.numProcessingTimeTimers() == 2) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement2(new StreamRecord( CRow(Row.of(1: JInt, "bbb12"), true), 12)) testHarness.setProcessingTime(13) - assert(testHarness.numKeyedStateEntries() == 2) - assert(testHarness.numProcessingTimeTimers() == 1) + assertEquals(2, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - testHarness.setProcessingTime(14) - assert(testHarness.numKeyedStateEntries() == 0) - assert(testHarness.numProcessingTimeTimers() == 0) + // state must be cleaned after the window timer interval has passed without new rows. + testHarness.setProcessingTime(23) + assertEquals(0, testHarness.numKeyedStateEntries()) + assertEquals(0, testHarness.numProcessingTimeTimers()) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() http://git-wip-us.apache.org/repos/asf/flink/blob/471345c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala new file mode 100644 index 0000000..ab7925b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit._ + +import scala.collection.mutable + +class JoinITCase extends StreamingWithStateTestBase { + + /** test process time inner join **/ + @Test + def testProcessTimeInnerJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + StreamITCase.clear + env.setParallelism(1) + + val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " + + "t1.proctime between t2.proctime - interval '5' second and t2.proctime + interval '5' second" + + val data1 = new mutable.MutableList[(Int, Long, String)] + data1.+=((1, 1L, "Hi1")) + data1.+=((1, 2L, "Hi2")) + data1.+=((1, 5L, "Hi3")) + data1.+=((2, 7L, "Hi5")) + data1.+=((1, 9L, "Hi6")) + data1.+=((1, 8L, "Hi8")) + + val data2 = new mutable.MutableList[(Int, Long, String)] + data2.+=((1, 1L, "HiHi")) + data2.+=((2, 2L, "HeHe")) + + val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sql(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + + /** test process time inner join with other condition **/ + @Test + def testProcessTimeInnerJoinWithOtherCondition(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + StreamITCase.clear + env.setParallelism(1) + + val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on t1.a = t2.a and " + + "t1.proctime between t2.proctime - interval '5' second " + + "and t2.proctime + interval '5' second " + + "and t1.b > t2.b and t1.b + t2.b < 14" + + val data1 = new mutable.MutableList[(String, Long, String)] + data1.+=(("1", 1L, "Hi1")) + data1.+=(("1", 2L, "Hi2")) + data1.+=(("1", 5L, "Hi3")) + data1.+=(("2", 7L, "Hi5")) + data1.+=(("1", 9L, "Hi6")) + data1.+=(("1", 8L, "Hi8")) + + val data2 = new mutable.MutableList[(String, Long, String)] + data2.+=(("1", 5L, "HiHi")) + data2.+=(("2", 2L, "HeHe")) + + val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sql(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + +} +