[FLINK-3656] [table] Add test base for logical unit testing.

This closes #2595


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2061852a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2061852a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2061852a

Branch: refs/heads/master
Commit: 2061852ad0993e35e7d6667ad5ca7028af16a3b7
Parents: cf22006
Author: twalthr <twal...@apache.org>
Authored: Tue Oct 4 18:32:52 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Oct 10 20:49:12 2016 +0200

----------------------------------------------------------------------
 .../api/table/ExpressionReductionTest.scala     | 413 ++++++++++---------
 .../flink/api/table/utils/TableTestBase.scala   | 148 +++++++
 2 files changed, 367 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2061852a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
index 925a818..2d4694e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -17,48 +17,19 @@
  */
 package org.apache.flink.api.table
 
-import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, 
createTypeInformation}
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
-import org.junit.Assert._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
 import org.junit.Test
-import org.mockito.Mockito.{mock, when}
 
-class ExpressionReductionTest {
 
-  private def mockBatchTableEnvironment(): BatchTableEnvironment = {
-    val env = mock(classOf[ExecutionEnvironment])
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = mock(classOf[DataSet[(Int, Long, String)]])
-    val jDs = mock(classOf[JDataSet[(Int, Long, String)]])
-    when(ds.javaSet).thenReturn(jDs)
-    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
-
-    val t = ds.toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-    tEnv
-  }
-
-  private def mockStreamTableEnvironment(): StreamTableEnvironment = {
-    val env = mock(classOf[StreamExecutionEnvironment])
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = mock(classOf[DataStream[(Int, Long, String)]])
-    val jDs = mock(classOf[JDataStream[(Int, Long, String)]])
-    when(ds.javaStream).thenReturn(jDs)
-    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
-
-    val t = ds.toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-    tEnv
-  }
+class ExpressionReductionTest extends TableTestBase {
 
   @Test
   def testReduceCalcExpressionForBatchSQL(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "(3+4)+a, " +
@@ -76,29 +47,34 @@ class ExpressionReductionTest {
       "CAST(TRUE AS VARCHAR) || 'X'" +
       "FROM MyTable WHERE a>(1+7)"
 
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'TRUEX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceProjectExpressionForBatchSQL(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "(3+4)+a, " +
@@ -116,46 +92,54 @@ class ExpressionReductionTest {
       "CAST(TRUE AS VARCHAR) || 'X'" +
       "FROM MyTable"
 
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'TRUEX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceFilterExpressionForBatchSQL(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "*" +
       "FROM MyTable WHERE a>(1+7)"
 
-    val table = tEnv.sql(sqlQuery)
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
 
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceCalcExpressionForBatchTableAPI(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table = tEnv
-      .scan("MyTable")
+    val result = table
       .where('a > (1 + 7))
       .select((3 + 4).toExpr + 6,
               (11 === 1) ? ("a", "b"),
@@ -167,27 +151,32 @@ class ExpressionReductionTest {
               2.5.toExpr.floor(),
               true.cast(Types.STRING) + "X")
 
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'TRUEX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
   }
 
   @Test
   def testReduceProjectExpressionForBatchTableAPI(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table = tEnv
-      .scan("MyTable")
+    val result = table
       .select((3 + 4).toExpr + 6,
               (11 === 1) ? ("a", "b"),
               " STRING ".trim,
@@ -198,36 +187,47 @@ class ExpressionReductionTest {
               2.5.toExpr.floor(),
               true.cast(Types.STRING) + "X")
 
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'TRUEX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
   }
 
   @Test
   def testReduceFilterExpressionForBatchTableAPI(): Unit = {
-    val tEnv = mockBatchTableEnvironment()
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table = tEnv
-      .scan("MyTable")
+    val result = table
       .where('a > (1 + 7))
 
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
   }
 
   @Test
   def testReduceCalcExpressionForStreamSQL(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "(3+4)+a, " +
@@ -245,29 +245,34 @@ class ExpressionReductionTest {
       "CAST(TRUE AS VARCHAR) || 'X'" +
       "FROM MyTable WHERE a>(1+7)"
 
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'TRUEX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceProjectExpressionForStreamSQL(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "(3+4)+a, " +
@@ -285,46 +290,54 @@ class ExpressionReductionTest {
       "CAST(TRUE AS VARCHAR) || 'X'" +
       "FROM MyTable"
 
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'TRUEX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceFilterExpressionForStreamSQL(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val sqlQuery = "SELECT " +
       "*" +
       "FROM MyTable WHERE a>(1+7)"
 
-    val table = tEnv.sql(sqlQuery)
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
 
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
+    util.verifySql(sqlQuery, expected)
   }
 
   @Test
   def testReduceCalcExpressionForStreamTableAPI(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table = tEnv
-      .ingest("MyTable")
+    val result = table
       .where('a > (1 + 7))
       .select((3 + 4).toExpr + 6,
               (11 === 1) ? ("a", "b"),
@@ -336,28 +349,32 @@ class ExpressionReductionTest {
               2.5.toExpr.floor(),
               true.cast(Types.STRING) + "X")
 
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'TRUEX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
   }
 
   @Test
   def testReduceProjectExpressionForStreamTableAPI(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table =  tEnv
-      .ingest("MyTable")
-      .where('a > (1 + 7))
+    val result =  table
       .select((3 + 4).toExpr + 6,
               (11 === 1) ? ("a", "b"),
               " STRING ".trim,
@@ -368,33 +385,41 @@ class ExpressionReductionTest {
               2.5.toExpr.floor(),
               true.cast(Types.STRING) + "X")
 
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'TRUEX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
   }
 
   @Test
   def testReduceFilterExpressionForStreamTableAPI(): Unit = {
-    val tEnv = mockStreamTableEnvironment()
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val table = tEnv
-      .ingest("MyTable")
+    val result = table
       .where('a > (1 + 7))
 
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
 
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
+    util.verifyTable(result, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2061852a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
new file mode 100644
index 0000000..fd43ed4
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.utils
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{Table, TableEnvironment}
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+import org.junit.Assert
+import org.mockito.Mockito.{mock, when}
+
+/**
+  * Test base for testing Table API / SQL plans.
+  */
+class TableTestBase {
+
+  def batchTestUtil(): BatchTableTestUtil = {
+    BatchTableTestUtil()
+  }
+
+  def streamTestUtil(): StreamTableTestUtil = {
+    StreamTableTestUtil()
+  }
+
+}
+
+abstract class TableTestUtil {
+  def addTable[T: TypeInformation](name: String, fields: Expression*): Table
+  def verifySql(query: String, expected: String): Unit
+  def verifyTable(resultTable: Table, expected: String): Unit
+}
+
+object TableTestUtil {
+
+  // this methods are currently just for simplifying string construction,
+  // we could replace it with logic later
+
+  def unaryNode(node: String, input: String, term: String*): String = {
+    s"""$node(${term.mkString(", ")})
+       |  $input
+       |""".stripMargin
+  }
+
+  def binaryNode(node: String, left: String, right: String, term: String*): 
String = {
+    s"""$node(${term.mkString(", ")})
+       |  $left
+       |  $right
+       |""".stripMargin
+  }
+
+  def term(term: String, value: String*): String = {
+    s"$term=[${value.mkString(", ")}]"
+  }
+
+  def batchTableNode(idx: Int): String = {
+    s"DataSetScan(table=[[_DataSetTable_$idx]])"
+  }
+
+  def streamTableNode(idx: Int): String = {
+    s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
+  }
+}
+
+case class BatchTableTestUtil() extends TableTestUtil {
+
+  val env = mock(classOf[ExecutionEnvironment])
+  val tEnv = TableEnvironment.getTableEnvironment(env)
+
+  def addTable[T: TypeInformation](
+      name: String,
+      fields: Expression*)
+    : Table = {
+    val ds = mock(classOf[DataSet[T]])
+    val jDs = mock(classOf[JDataSet[T]])
+    when(ds.javaSet).thenReturn(jDs)
+    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = ds.toTable(tEnv, fields: _*)
+    tEnv.registerTable(name, t)
+    t
+  }
+
+  def verifySql(query: String, expected: String): Unit = {
+    verifyTable(tEnv.sql(query), expected)
+  }
+
+  def verifyTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    val actual = RelOptUtil.toString(optimized)
+    Assert.assertEquals(expected, actual)
+  }
+}
+
+case class StreamTableTestUtil() extends TableTestUtil {
+
+  val env = mock(classOf[StreamExecutionEnvironment])
+  val tEnv = TableEnvironment.getTableEnvironment(env)
+
+  def addTable[T: TypeInformation](
+      name: String,
+      fields: Expression*)
+    : Table = {
+
+    val ds = mock(classOf[DataStream[T]])
+    val jDs = mock(classOf[JDataStream[T]])
+    when(ds.javaStream).thenReturn(jDs)
+    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = ds.toTable(tEnv, fields: _*)
+    tEnv.registerTable(name, t)
+    t
+  }
+
+  def verifySql(query: String, expected: String): Unit = {
+    verifyTable(tEnv.sql(query), expected)
+  }
+
+  def verifyTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    val actual = RelOptUtil.toString(optimized)
+    Assert.assertEquals(expected, actual)
+  }
+}

Reply via email to