[FLINK-3603][tableAPI] Enable and fix Table API explain.

This closes #1783


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89d48f5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89d48f5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89d48f5c

Branch: refs/heads/master
Commit: 89d48f5ccd8eb423a022ce7ad124ffa95c535268
Parents: a779d5c
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Mar 11 00:53:56 2016 +0100
Committer: vasia <va...@apache.org>
Committed: Fri Mar 18 14:44:51 2016 +0100

----------------------------------------------------------------------
 .../api/table/plan/TranslationContext.scala     |   2 +
 .../plan/nodes/dataset/DataSetAggregate.scala   |   1 -
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   2 +-
 .../org/apache/flink/api/table/table.scala      |  38 ++++---
 .../api/java/table/test/SqlExplainITCase.java   |  61 +++++-----
 .../api/scala/table/test/SqlExplainITCase.scala | 102 -----------------
 .../api/scala/table/test/SqlExplainTest.scala   | 110 +++++++++++++++++++
 .../src/test/scala/resources/testFilter0.out    |   9 +-
 .../src/test/scala/resources/testFilter1.out    |  17 +--
 .../src/test/scala/resources/testJoin0.out      |  35 ++++--
 .../src/test/scala/resources/testJoin1.out      |  52 +++++++--
 .../src/test/scala/resources/testUnion0.out     |   8 +-
 .../src/test/scala/resources/testUnion1.out     |   8 +-
 13 files changed, 254 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index 5e8e1bc..31541ad 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -61,6 +61,8 @@ object TranslationContext {
 
     relBuilder = RelBuilder.create(frameworkConfig)
 
+    nameCntr.set(0)
+
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 6bf7309..d9a0a93 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -93,7 +93,6 @@ class DataSetAggregate(
 
     val rowTypeInfo = new RowTypeInfo(fieldTypes, 
rowType.getFieldNames.asScala)
     val aggString = aggregationToString
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
     val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare 
$aggString")
     val groupReduceFunction = aggregateResult._2
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 5c1f9bb..ffb3e1b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -156,7 +156,7 @@ class DataSetJoin(
     val condString = s"where: (${getExpressionString(joinCondition, inFields, 
None)})"
     val outFieldString = s"join: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-    condString + ", "+outFieldString
+    condString + ", " + outFieldString
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 43c097e..53c63cb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.table
 
+import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.core.JoinRelType
@@ -26,11 +27,16 @@ import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
 import org.apache.calcite.util.NlsString
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
 import RexNodeTranslator.{toRexNode, extractAggCalls}
 import org.apache.flink.api.table.expressions.{Naming, 
UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.parser.ExpressionParser
 
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
 import scala.collection.JavaConverters._
 
 case class BaseTable(
@@ -353,23 +359,21 @@ class Table(
   }
 
   /**
-   * Get the process of the sql parsing, print AST and physical execution 
plan.The AST
-   * show the structure of the supplied statement. The execution plan shows 
how the table
-   * referenced by the statement will be scanned.
-   */
-  def explain(extended: Boolean): String = {
-
-    // TODO: enable once toDataSet() is working again
-
-//    val ast = operation
-//    val dataSet = this.toDataSet[Row]
-//    val env = dataSet.getExecutionEnvironment
-//    dataSet.output(new DiscardingOutputFormat[Row])
-//    val jasonSqlPlan = env.getExecutionPlan()
-//    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
-//    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== 
Physical Execution Plan ==" +
-//      "\n" + sqlPlan
-//    return result
+    * Get the process of the sql parsing, print AST and physical execution 
plan.The AST
+    * show the structure of the supplied statement. The execution plan shows 
how the table
+    * referenced by the statement will be scanned.
+    */
+  private[flink] def explain(extended: Boolean): String = {
+
+    val ast = RelOptUtil.toString(relNode)
+    val dataSet = this.toDataSet[Row]
+    dataSet.output(new DiscardingOutputFormat[Row])
+    val env = dataSet.getExecutionEnvironment
+    val jasonSqlPlan = env.getExecutionPlan()
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+    val result = "== Abstract Syntax Tree ==\n" + ast + "\n" + "== Physical 
Execution Plan ==" +
+      "\n" + sqlPlan
+    return result
 
     ""
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
index da57c6e..9e09664 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.table.Table;
-import org.junit.Ignore;
+import org.apache.flink.api.table.plan.TranslationContext;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -30,7 +32,11 @@ import java.util.Scanner;
 
 import static org.junit.Assert.assertEquals;
 
-public class SqlExplainITCase {
+public class SqlExplainITCase extends MultipleProgramsTestBase {
+
+       public SqlExplainITCase() {
+               super(TestExecutionMode.CLUSTER);
+       }
 
        private static String testFilePath = 
SqlExplainITCase.class.getResource("/").getFile();
 
@@ -47,10 +53,14 @@ public class SqlExplainITCase {
                }
        }
 
-       @Ignore
+       @Before
+       public void resetContext() {
+               TranslationContext.reset();
+       }
+
        @Test
-       public void testGroupByWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+       public void testFilterWithoutExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input = env.fromElements(
@@ -58,7 +68,7 @@ public class SqlExplainITCase {
                                new WC(2,"d"),
                                new WC(3,"d"));
 
-               Table table = tableEnv.fromDataSet(input).as("a, b");
+               Table table = tableEnv.fromDataSet(input, "count as a, word as 
b");
 
                String result = table
                                .filter("a % 2 = 0")
@@ -66,13 +76,12 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                
"../../src/test/scala/resources/testFilter0.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 
-       @Ignore
        @Test
-       public void testGroupByWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+       public void testFilterWithExtended() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input = env.fromElements(
@@ -80,7 +89,7 @@ public class SqlExplainITCase {
                                new WC(2, "d"),
                                new WC(3, "d"));
 
-               Table table = tableEnv.fromDataSet(input).as("a, b");
+               Table table = tableEnv.fromDataSet(input, "count as a, word as 
b");
 
                String result = table
                                .filter("a % 2 = 0")
@@ -88,13 +97,12 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                
"../../src/test/scala/resources/testFilter1.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 
-       @Ignore
        @Test
        public void testJoinWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input1 = env.fromElements(
@@ -102,14 +110,14 @@ public class SqlExplainITCase {
                                new WC(1, "d"),
                                new WC(1, "d"));
 
-               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+               Table table1 = tableEnv.fromDataSet(input1, "count as a, word 
as b");
 
                DataSet<WC> input2 = env.fromElements(
                                new WC(1,"d"),
                                new WC(1,"d"),
                                new WC(1,"d"));
 
-               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+               Table table2 = tableEnv.fromDataSet(input2, "count as c, word 
as d");
 
                String result = table1
                                .join(table2)
@@ -119,13 +127,12 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                "../../src/test/scala/resources/testJoin0.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 
-       @Ignore
        @Test
        public void testJoinWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input1 = env.fromElements(
@@ -133,14 +140,14 @@ public class SqlExplainITCase {
                                new WC(1, "d"),
                                new WC(1, "d"));
 
-               Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+               Table table1 = tableEnv.fromDataSet(input1, "count as a, word 
as b");
 
                DataSet<WC> input2 = env.fromElements(
                                new WC(1, "d"),
                                new WC(1, "d"),
                                new WC(1, "d"));
 
-               Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+               Table table2 = tableEnv.fromDataSet(input2, "count as c, word 
as d");
 
                String result = table1
                                .join(table2)
@@ -150,13 +157,12 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                "../../src/test/scala/resources/testJoin1.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 
-       @Ignore
        @Test
        public void testUnionWithoutExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input1 = env.fromElements(
@@ -179,13 +185,12 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                
"../../src/test/scala/resources/testUnion0.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 
-       @Ignore
        @Test
        public void testUnionWithExtended() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<WC> input1 = env.fromElements(
@@ -208,6 +213,6 @@ public class SqlExplainITCase {
                String source = new Scanner(new File(testFilePath +
                                
"../../src/test/scala/resources/testUnion1.out"))
                                .useDelimiter("\\A").next();
-               assertEquals(result, source);
+               assertEquals(source, result);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
deleted file mode 100644
index 954970f..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-import org.junit._
-import org.junit.Assert.assertEquals
-
-case class WC(count: Int, word: String)
-
-class SqlExplainITCase {
-
-  val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile
-
-  @Ignore
-  @Test
-  def testGroupByWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
-    val result = expr.filter("a % 2 = 0").explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Ignore
-  @Test
-  def testGroupByWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, 
"ciao")).toTable.as('a, 'b)
-    val result = expr.filter("a % 2 = 0").explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Ignore
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Ignore
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable.as('a, 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable.as('c, 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Ignore
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
-    val result = expr1.unionAll(expr2).explain()
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Ignore
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.createLocalEnvironment()
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
-    val result = expr1.unionAll(expr2).explain(true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString
-    assertEquals(result, source)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
new file mode 100644
index 0000000..de07b24
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.test.util.MultipleProgramsTestBase
+
+import org.junit._
+import org.junit.Assert.assertEquals
+
+case class WC(count: Int, word: String)
+
+class SqlExplainTest
+  extends 
MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
+
+  val testFilePath = SqlExplainTest.this.getClass.getResource("/").getFile
+
+  @Before
+  def resetContext(): Unit = {
+    TranslationContext.reset()
+  }
+
+  @Test
+  def testFilterWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao"))
+      .as('count as 'a, 'word as 'b)
+    val result = expr.filter("a % 2 = 0").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testFilterWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao"))
+      .as('count as 'a, 'word as 'b)
+    val result = expr.filter("a % 2 = 0").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao"))
+      .as('count as 'a, 'word as 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java"))
+      .as('count as 'c, 'word as 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao"))
+      .as('count as 'a, 'word as 'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java"))
+      .as('count as 'c, 'word as 'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, 
"java")).toTable
+    val result = expr1.unionAll(expr2).explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString
+    assertEquals(result, source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
index 062fc90..1d0198d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -1,5 +1,6 @@
 == Abstract Syntax Tree ==
-Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+LogicalFilter(condition=[=(MOD($0, 2), 0)])
+  LogicalTableScan(table=[[DataSetTable_0]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source
@@ -7,14 +8,14 @@ Stage 3 : Data Source
        Partitioning : RANDOM_PARTITIONED
 
        Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (a, b)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
 
-               Stage 1 : Filter
-                       content : ('a * 2) === 0
+               Stage 1 : FlatMap
+                       content : where: (=(MOD(a, 2), 0)), select: (a, b)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : FlatMap

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
index 83378e6..ea76faa 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -1,5 +1,6 @@
 == Abstract Syntax Tree ==
-Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+LogicalFilter(condition=[=(MOD($0, 2), 0)])
+  LogicalTableScan(table=[[DataSetTable_0]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source
@@ -24,7 +25,7 @@ Stage 3 : Data Source
        Filter Factor : (none)
 
        Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (a, b)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
@@ -47,8 +48,8 @@ Stage 3 : Data Source
                Avg. Output Record Size (bytes) : (none)
                Filter Factor : (none)
 
-               Stage 1 : Filter
-                       content : ('a * 2) === 0
+               Stage 1 : FlatMap
+                       content : where: (=(MOD(a, 2), 0)), select: (a, b)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : FlatMap
@@ -58,8 +59,8 @@ Stage 3 : Data Source
                        Order : (none)
                        Grouping : not grouped
                        Uniqueness : not unique
-                       Est. Output Size : 0.0
-                       Est. Cardinality : 0.0
+                       Est. Output Size : (unknown)
+                       Est. Cardinality : (unknown)
                        Network : 0.0
                        Disk I/O : 0.0
                        CPU : 0.0
@@ -81,8 +82,8 @@ Stage 3 : Data Source
                                Order : (none)
                                Grouping : not grouped
                                Uniqueness : not unique
-                               Est. Output Size : 0.0
-                               Est. Cardinality : 0.0
+                               Est. Output Size : (unknown)
+                               Est. Cardinality : (unknown)
                                Network : 0.0
                                Disk I/O : 0.0
                                CPU : 0.0

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index e6e30be..85b815d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -1,39 +1,50 @@
 == Abstract Syntax Tree ==
-Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+LogicalProject(a=[$0], c=[$2])
+  LogicalFilter(condition=[=($1, $3)])
+    LogicalJoin(condition=[true], joinType=[inner])
+      LogicalTableScan(table=[[DataSetTable_0]])
+      LogicalTableScan(table=[[DataSetTable_1]])
 
 == Physical Execution Plan ==
-Stage 3 : Data Source
+Stage 4 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
 
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+       Stage 3 : Map
+               content : from: (a, b)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
 
-Stage 5 : Data Source
+Stage 6 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
 
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
+       Stage 5 : Map
+               content : from: (c, d)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
 
-               Stage 1 : Join
-                       content : Join at 'b === 'd
+               Stage 2 : Join
+                       content : where: (=(b, d)), join: (a, b, c, d)
                        ship_strategy : Hash Partition on [1]
                        exchange_mode : PIPELINED
-                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       driver_strategy : Hybrid Hash (build: from: (a, b))
                        Partitioning : RANDOM_PARTITIONED
 
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                       Stage 1 : FlatMap
+                               content : select: (a, c AS b)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
+                               driver_strategy : FlatMap
                                Partitioning : RANDOM_PARTITIONED
 
+                               Stage 0 : Data Sink
+                                       content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                                       ship_strategy : Forward
+                                       exchange_mode : PIPELINED
+                                       Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index a8f05dd..e88da82 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -1,8 +1,12 @@
 == Abstract Syntax Tree ==
-Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), 
As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c)
+LogicalProject(a=[$0], c=[$2])
+  LogicalFilter(condition=[=($1, $3)])
+    LogicalJoin(condition=[true], joinType=[inner])
+      LogicalTableScan(table=[[DataSetTable_0]])
+      LogicalTableScan(table=[[DataSetTable_1]])
 
 == Physical Execution Plan ==
-Stage 3 : Data Source
+Stage 4 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
        Partitioning Order : (none)
@@ -23,8 +27,8 @@ Stage 3 : Data Source
        Avg. Output Record Size (bytes) : (none)
        Filter Factor : (none)
 
-       Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+       Stage 3 : Map
+               content : from: (a, b)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
@@ -47,7 +51,7 @@ Stage 3 : Data Source
                Avg. Output Record Size (bytes) : (none)
                Filter Factor : (none)
 
-Stage 5 : Data Source
+Stage 6 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
        Partitioning Order : (none)
@@ -68,8 +72,8 @@ Stage 5 : Data Source
        Avg. Output Record Size (bytes) : (none)
        Filter Factor : (none)
 
-       Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
+       Stage 5 : Map
+               content : from: (c, d)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
@@ -92,11 +96,11 @@ Stage 5 : Data Source
                Avg. Output Record Size (bytes) : (none)
                Filter Factor : (none)
 
-               Stage 1 : Join
-                       content : Join at 'b === 'd
+               Stage 2 : Join
+                       content : where: (=(b, d)), join: (a, b, c, d)
                        ship_strategy : Hash Partition on [1]
                        exchange_mode : PIPELINED
-                       driver_strategy : Hybrid Hash (build: Map at 
select('count as 'count,'word as 'word))
+                       driver_strategy : Hybrid Hash (build: from: (a, b))
                        Partitioning : RANDOM_PARTITIONED
                        Partitioning Order : (none)
                        Uniqueness : not unique
@@ -116,10 +120,11 @@ Stage 5 : Data Source
                        Avg. Output Record Size (bytes) : (none)
                        Filter Factor : (none)
 
-                       Stage 0 : Data Sink
-                               content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                       Stage 1 : FlatMap
+                               content : select: (a, c AS b)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
+                               driver_strategy : FlatMap
                                Partitioning : RANDOM_PARTITIONED
                                Partitioning Order : (none)
                                Uniqueness : not unique
@@ -139,3 +144,26 @@ Stage 5 : Data Source
                                Avg. Output Record Size (bytes) : (none)
                                Filter Factor : (none)
 
+                               Stage 0 : Data Sink
+                                       content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                                       ship_strategy : Forward
+                                       exchange_mode : PIPELINED
+                                       Partitioning : RANDOM_PARTITIONED
+                                       Partitioning Order : (none)
+                                       Uniqueness : not unique
+                                       Order : (none)
+                                       Grouping : not grouped
+                                       Uniqueness : not unique
+                                       Est. Output Size : (unknown)
+                                       Est. Cardinality : (unknown)
+                                       Network : 0.0
+                                       Disk I/O : 0.0
+                                       CPU : 0.0
+                                       Cumulative Network : (unknown)
+                                       Cumulative Disk I/O : (unknown)
+                                       Cumulative CPU : (unknown)
+                                       Output Size (bytes) : (none)
+                                       Output Cardinality : (none)
+                                       Avg. Output Record Size (bytes) : (none)
+                                       Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out 
b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index db9d2f9..8e892c6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -1,5 +1,7 @@
 == Abstract Syntax Tree ==
-Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+LogicalUnion(all=[true])
+  LogicalTableScan(table=[[DataSetTable_0]])
+  LogicalTableScan(table=[[DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source
@@ -7,7 +9,7 @@ Stage 3 : Data Source
        Partitioning : RANDOM_PARTITIONED
 
        Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (count, word)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
@@ -18,7 +20,7 @@ Stage 5 : Data Source
        Partitioning : RANDOM_PARTITIONED
 
        Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (count, word)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map

http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out 
b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index 8dc1e53..34892b1 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -1,5 +1,7 @@
 == Abstract Syntax Tree ==
-Union(Root(ArraySeq((count,Integer), (word,String))), 
Root(ArraySeq((count,Integer), (word,String))))
+LogicalUnion(all=[true])
+  LogicalTableScan(table=[[DataSetTable_0]])
+  LogicalTableScan(table=[[DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source
@@ -24,7 +26,7 @@ Stage 3 : Data Source
        Filter Factor : (none)
 
        Stage 2 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (count, word)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
@@ -69,7 +71,7 @@ Stage 5 : Data Source
        Filter Factor : (none)
 
        Stage 4 : Map
-               content : Map at select('count as 'count,'word as 'word)
+               content : from: (count, word)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map

Reply via email to