[FLINK-3656] [table] Add test base for logical unit testing. This closes #2595
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2061852a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2061852a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2061852a Branch: refs/heads/master Commit: 2061852ad0993e35e7d6667ad5ca7028af16a3b7 Parents: cf22006 Author: twalthr <twal...@apache.org> Authored: Tue Oct 4 18:32:52 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Oct 10 20:49:12 2016 +0200 ---------------------------------------------------------------------- .../api/table/ExpressionReductionTest.scala | 413 ++++++++++--------- .../flink/api/table/utils/TableTestBase.scala | 148 +++++++ 2 files changed, 367 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2061852a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala index 925a818..2d4694e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala @@ -17,48 +17,19 @@ */ package org.apache.flink.api.table -import org.apache.flink.api.java.{DataSet => JDataSet} +import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation} -import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.junit.Assert._ +import org.apache.flink.api.table.utils.TableTestBase +import org.apache.flink.api.table.utils.TableTestUtil._ import org.junit.Test -import org.mockito.Mockito.{mock, when} -class ExpressionReductionTest { - private def mockBatchTableEnvironment(): BatchTableEnvironment = { - val env = mock(classOf[ExecutionEnvironment]) - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = mock(classOf[DataSet[(Int, Long, String)]]) - val jDs = mock(classOf[JDataSet[(Int, Long, String)]]) - when(ds.javaSet).thenReturn(jDs) - when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)]) - - val t = ds.toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", t) - tEnv - } - - private def mockStreamTableEnvironment(): StreamTableEnvironment = { - val env = mock(classOf[StreamExecutionEnvironment]) - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = mock(classOf[DataStream[(Int, Long, String)]]) - val jDs = mock(classOf[JDataStream[(Int, Long, String)]]) - when(ds.javaStream).thenReturn(jDs) - when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)]) - - val t = ds.toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", t) - tEnv - } +class ExpressionReductionTest extends TableTestBase { @Test def testReduceCalcExpressionForBatchSQL(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "(3+4)+a, " + @@ -76,29 +47,34 @@ class ExpressionReductionTest { "CAST(TRUE AS VARCHAR) || 'X'" + "FROM MyTable WHERE a>(1+7)" - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", + "+(7, a) AS EXPR$0", + "+(b, 3) AS EXPR$1", + "'b' AS EXPR$2", + "'STRING' AS EXPR$3", + "'teststring' AS EXPR$4", + "null AS EXPR$5", + "1990-10-24 23:00:01 AS EXPR$6", + "19 AS EXPR$7", + "false AS EXPR$8", + "true AS EXPR$9", + "2 AS EXPR$10", + "true AS EXPR$11", + "'TRUEX' AS EXPR$12" + ), + term("where", ">(a, 8)") + ) + + util.verifySql(sqlQuery, expected) } @Test def testReduceProjectExpressionForBatchSQL(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "(3+4)+a, " + @@ -116,46 +92,54 @@ class ExpressionReductionTest { "CAST(TRUE AS VARCHAR) || 'X'" + "FROM MyTable" - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", + "+(7, a) AS EXPR$0", + "+(b, 3) AS EXPR$1", + "'b' AS EXPR$2", + "'STRING' AS EXPR$3", + "'teststring' AS EXPR$4", + "null AS EXPR$5", + "1990-10-24 23:00:01 AS EXPR$6", + "19 AS EXPR$7", + "false AS EXPR$8", + "true AS EXPR$9", + "2 AS EXPR$10", + "true AS EXPR$11", + "'TRUEX' AS EXPR$12" + ) + ) + + util.verifySql(sqlQuery, expected) } @Test def testReduceFilterExpressionForBatchSQL(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "*" + "FROM MyTable WHERE a>(1+7)" - val table = tEnv.sql(sqlQuery) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 8)") + ) - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) + util.verifySql(sqlQuery, expected) } @Test def testReduceCalcExpressionForBatchTableAPI(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .scan("MyTable") + val result = table .where('a > (1 + 7)) .select((3 + 4).toExpr + 6, (11 === 1) ? ("a", "b"), @@ -167,27 +151,32 @@ class ExpressionReductionTest { 2.5.toExpr.floor(), true.cast(Types.STRING) + "X") - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", + "13 AS _c0", + "'b' AS _c1", + "'STRING' AS _c2", + "'teststring' AS _c3", + "1990-10-24 23:00:01 AS _c4", + "false AS _c5", + "true AS _c6", + "2E0 AS _c7", + "'TRUEX' AS _c8" + ), + term("where", ">(a, 8)") + ) + + util.verifyTable(result, expected) } @Test def testReduceProjectExpressionForBatchTableAPI(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .scan("MyTable") + val result = table .select((3 + 4).toExpr + 6, (11 === 1) ? ("a", "b"), " STRING ".trim, @@ -198,36 +187,47 @@ class ExpressionReductionTest { 2.5.toExpr.floor(), true.cast(Types.STRING) + "X") - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", + "13 AS _c0", + "'b' AS _c1", + "'STRING' AS _c2", + "'teststring' AS _c3", + "1990-10-24 23:00:01 AS _c4", + "false AS _c5", + "true AS _c6", + "2E0 AS _c7", + "'TRUEX' AS _c8" + ) + ) + + util.verifyTable(result, expected) } @Test def testReduceFilterExpressionForBatchTableAPI(): Unit = { - val tEnv = mockBatchTableEnvironment() + val util = batchTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .scan("MyTable") + val result = table .where('a > (1 + 7)) - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) + val expected = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 8)") + ) + + util.verifyTable(result, expected) } @Test def testReduceCalcExpressionForStreamSQL(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "(3+4)+a, " + @@ -245,29 +245,34 @@ class ExpressionReductionTest { "CAST(TRUE AS VARCHAR) || 'X'" + "FROM MyTable WHERE a>(1+7)" - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", + "+(7, a) AS EXPR$0", + "+(b, 3) AS EXPR$1", + "'b' AS EXPR$2", + "'STRING' AS EXPR$3", + "'teststring' AS EXPR$4", + "null AS EXPR$5", + "1990-10-24 23:00:01 AS EXPR$6", + "19 AS EXPR$7", + "false AS EXPR$8", + "true AS EXPR$9", + "2 AS EXPR$10", + "true AS EXPR$11", + "'TRUEX' AS EXPR$12" + ), + term("where", ">(a, 8)") + ) + + util.verifySql(sqlQuery, expected) } @Test def testReduceProjectExpressionForStreamSQL(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "(3+4)+a, " + @@ -285,46 +290,54 @@ class ExpressionReductionTest { "CAST(TRUE AS VARCHAR) || 'X'" + "FROM MyTable" - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", + "+(7, a) AS EXPR$0", + "+(b, 3) AS EXPR$1", + "'b' AS EXPR$2", + "'STRING' AS EXPR$3", + "'teststring' AS EXPR$4", + "null AS EXPR$5", + "1990-10-24 23:00:01 AS EXPR$6", + "19 AS EXPR$7", + "false AS EXPR$8", + "true AS EXPR$9", + "2 AS EXPR$10", + "true AS EXPR$11", + "'TRUEX' AS EXPR$12" + ) + ) + + util.verifySql(sqlQuery, expected) } @Test def testReduceFilterExpressionForStreamSQL(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val sqlQuery = "SELECT " + "*" + "FROM MyTable WHERE a>(1+7)" - val table = tEnv.sql(sqlQuery) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 8)") + ) - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) + util.verifySql(sqlQuery, expected) } @Test def testReduceCalcExpressionForStreamTableAPI(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .ingest("MyTable") + val result = table .where('a > (1 + 7)) .select((3 + 4).toExpr + 6, (11 === 1) ? ("a", "b"), @@ -336,28 +349,32 @@ class ExpressionReductionTest { 2.5.toExpr.floor(), true.cast(Types.STRING) + "X") - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", + "13 AS _c0", + "'b' AS _c1", + "'STRING' AS _c2", + "'teststring' AS _c3", + "1990-10-24 23:00:01 AS _c4", + "false AS _c5", + "true AS _c6", + "2E0 AS _c7", + "'TRUEX' AS _c8" + ), + term("where", ">(a, 8)") + ) + + util.verifyTable(result, expected) } @Test def testReduceProjectExpressionForStreamTableAPI(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .ingest("MyTable") - .where('a > (1 + 7)) + val result = table .select((3 + 4).toExpr + 6, (11 === 1) ? ("a", "b"), " STRING ".trim, @@ -368,33 +385,41 @@ class ExpressionReductionTest { 2.5.toExpr.floor(), true.cast(Types.STRING) + "X") - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", + "13 AS _c0", + "'b' AS _c1", + "'STRING' AS _c2", + "'teststring' AS _c3", + "1990-10-24 23:00:01 AS _c4", + "false AS _c5", + "true AS _c6", + "2E0 AS _c7", + "'TRUEX' AS _c8" + ) + ) + + util.verifyTable(result, expected) } @Test def testReduceFilterExpressionForStreamTableAPI(): Unit = { - val tEnv = mockStreamTableEnvironment() + val util = streamTestUtil() + val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val table = tEnv - .ingest("MyTable") + val result = table .where('a > (1 + 7)) + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 8)") + ) - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) + util.verifyTable(result, expected) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2061852a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala new file mode 100644 index 0000000..fd43ed4 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.utils + +import org.apache.calcite.plan.RelOptUtil +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{DataSet => JDataSet} +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.{Table, TableEnvironment} +import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.junit.Assert +import org.mockito.Mockito.{mock, when} + +/** + * Test base for testing Table API / SQL plans. + */ +class TableTestBase { + + def batchTestUtil(): BatchTableTestUtil = { + BatchTableTestUtil() + } + + def streamTestUtil(): StreamTableTestUtil = { + StreamTableTestUtil() + } + +} + +abstract class TableTestUtil { + def addTable[T: TypeInformation](name: String, fields: Expression*): Table + def verifySql(query: String, expected: String): Unit + def verifyTable(resultTable: Table, expected: String): Unit +} + +object TableTestUtil { + + // this methods are currently just for simplifying string construction, + // we could replace it with logic later + + def unaryNode(node: String, input: String, term: String*): String = { + s"""$node(${term.mkString(", ")}) + | $input + |""".stripMargin + } + + def binaryNode(node: String, left: String, right: String, term: String*): String = { + s"""$node(${term.mkString(", ")}) + | $left + | $right + |""".stripMargin + } + + def term(term: String, value: String*): String = { + s"$term=[${value.mkString(", ")}]" + } + + def batchTableNode(idx: Int): String = { + s"DataSetScan(table=[[_DataSetTable_$idx]])" + } + + def streamTableNode(idx: Int): String = { + s"DataStreamScan(table=[[_DataStreamTable_$idx]])" + } +} + +case class BatchTableTestUtil() extends TableTestUtil { + + val env = mock(classOf[ExecutionEnvironment]) + val tEnv = TableEnvironment.getTableEnvironment(env) + + def addTable[T: TypeInformation]( + name: String, + fields: Expression*) + : Table = { + val ds = mock(classOf[DataSet[T]]) + val jDs = mock(classOf[JDataSet[T]]) + when(ds.javaSet).thenReturn(jDs) + val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] + when(jDs.getType).thenReturn(typeInfo) + + val t = ds.toTable(tEnv, fields: _*) + tEnv.registerTable(name, t) + t + } + + def verifySql(query: String, expected: String): Unit = { + verifyTable(tEnv.sql(query), expected) + } + + def verifyTable(resultTable: Table, expected: String): Unit = { + val relNode = resultTable.getRelNode + val optimized = tEnv.optimize(relNode) + val actual = RelOptUtil.toString(optimized) + Assert.assertEquals(expected, actual) + } +} + +case class StreamTableTestUtil() extends TableTestUtil { + + val env = mock(classOf[StreamExecutionEnvironment]) + val tEnv = TableEnvironment.getTableEnvironment(env) + + def addTable[T: TypeInformation]( + name: String, + fields: Expression*) + : Table = { + + val ds = mock(classOf[DataStream[T]]) + val jDs = mock(classOf[JDataStream[T]]) + when(ds.javaStream).thenReturn(jDs) + val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] + when(jDs.getType).thenReturn(typeInfo) + + val t = ds.toTable(tEnv, fields: _*) + tEnv.registerTable(name, t) + t + } + + def verifySql(query: String, expected: String): Unit = { + verifyTable(tEnv.sql(query), expected) + } + + def verifyTable(resultTable: Table, expected: String): Unit = { + val relNode = resultTable.getRelNode + val optimized = tEnv.optimize(relNode) + val actual = RelOptUtil.toString(optimized) + Assert.assertEquals(expected, actual) + } +}