[FLINK-3603][tableAPI] Enable and fix Table API explain. This closes #1783
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89d48f5c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89d48f5c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89d48f5c Branch: refs/heads/master Commit: 89d48f5ccd8eb423a022ce7ad124ffa95c535268 Parents: a779d5c Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Mar 11 00:53:56 2016 +0100 Committer: vasia <va...@apache.org> Committed: Fri Mar 18 14:44:51 2016 +0100 ---------------------------------------------------------------------- .../api/table/plan/TranslationContext.scala | 2 + .../plan/nodes/dataset/DataSetAggregate.scala | 1 - .../table/plan/nodes/dataset/DataSetJoin.scala | 2 +- .../org/apache/flink/api/table/table.scala | 38 ++++--- .../api/java/table/test/SqlExplainITCase.java | 61 +++++----- .../api/scala/table/test/SqlExplainITCase.scala | 102 ----------------- .../api/scala/table/test/SqlExplainTest.scala | 110 +++++++++++++++++++ .../src/test/scala/resources/testFilter0.out | 9 +- .../src/test/scala/resources/testFilter1.out | 17 +-- .../src/test/scala/resources/testJoin0.out | 35 ++++-- .../src/test/scala/resources/testJoin1.out | 52 +++++++-- .../src/test/scala/resources/testUnion0.out | 8 +- .../src/test/scala/resources/testUnion1.out | 8 +- 13 files changed, 254 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index 5e8e1bc..31541ad 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -61,6 +61,8 @@ object TranslationContext { relBuilder = RelBuilder.create(frameworkConfig) + nameCntr.set(0) + } def addDataSet(newTable: DataSetTable[_]): String = { http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 6bf7309..d9a0a93 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -93,7 +93,6 @@ class DataSetAggregate( val rowTypeInfo = new RowTypeInfo(fieldTypes, rowType.getFieldNames.asScala) val aggString = aggregationToString - val rowTypeInfo = new RowTypeInfo(fieldTypes) val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare $aggString") val groupReduceFunction = aggregateResult._2 http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index 5c1f9bb..ffb3e1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -156,7 +156,7 @@ class DataSetJoin( val condString = s"where: (${getExpressionString(joinCondition, inFields, None)})" val outFieldString = s"join: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" - condString + ", "+outFieldString + condString + ", " + outFieldString } } http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 43c097e..53c63cb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.table +import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataTypeField import org.apache.calcite.rel.core.JoinRelType @@ -26,11 +27,16 @@ import org.apache.calcite.sql.SqlKind import org.apache.calcite.tools.RelBuilder import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.util.NlsString +import org.apache.flink.api.java.io.DiscardingOutputFormat +import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator} import RexNodeTranslator.{toRexNode, extractAggCalls} import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + import scala.collection.JavaConverters._ case class BaseTable( @@ -353,23 +359,21 @@ class Table( } /** - * Get the process of the sql parsing, print AST and physical execution plan.The AST - * show the structure of the supplied statement. The execution plan shows how the table - * referenced by the statement will be scanned. - */ - def explain(extended: Boolean): String = { - - // TODO: enable once toDataSet() is working again - -// val ast = operation -// val dataSet = this.toDataSet[Row] -// val env = dataSet.getExecutionEnvironment -// dataSet.output(new DiscardingOutputFormat[Row]) -// val jasonSqlPlan = env.getExecutionPlan() -// val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) -// val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" + -// "\n" + sqlPlan -// return result + * Get the process of the sql parsing, print AST and physical execution plan.The AST + * show the structure of the supplied statement. The execution plan shows how the table + * referenced by the statement will be scanned. + */ + private[flink] def explain(extended: Boolean): String = { + + val ast = RelOptUtil.toString(relNode) + val dataSet = this.toDataSet[Row] + dataSet.output(new DiscardingOutputFormat[Row]) + val env = dataSet.getExecutionEnvironment + val jasonSqlPlan = env.getExecutionPlan() + val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) + val result = "== Abstract Syntax Tree ==\n" + ast + "\n" + "== Physical Execution Plan ==" + + "\n" + sqlPlan + return result "" } http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java index da57c6e..9e09664 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java @@ -22,7 +22,9 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.table.Table; -import org.junit.Ignore; +import org.apache.flink.api.table.plan.TranslationContext; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Before; import org.junit.Test; import java.io.File; @@ -30,7 +32,11 @@ import java.util.Scanner; import static org.junit.Assert.assertEquals; -public class SqlExplainITCase { +public class SqlExplainITCase extends MultipleProgramsTestBase { + + public SqlExplainITCase() { + super(TestExecutionMode.CLUSTER); + } private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile(); @@ -47,10 +53,14 @@ public class SqlExplainITCase { } } - @Ignore + @Before + public void resetContext() { + TranslationContext.reset(); + } + @Test - public void testGroupByWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + public void testFilterWithoutExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input = env.fromElements( @@ -58,7 +68,7 @@ public class SqlExplainITCase { new WC(2,"d"), new WC(3,"d")); - Table table = tableEnv.fromDataSet(input).as("a, b"); + Table table = tableEnv.fromDataSet(input, "count as a, word as b"); String result = table .filter("a % 2 = 0") @@ -66,13 +76,12 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testFilter0.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } - @Ignore @Test - public void testGroupByWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + public void testFilterWithExtended() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input = env.fromElements( @@ -80,7 +89,7 @@ public class SqlExplainITCase { new WC(2, "d"), new WC(3, "d")); - Table table = tableEnv.fromDataSet(input).as("a, b"); + Table table = tableEnv.fromDataSet(input, "count as a, word as b"); String result = table .filter("a % 2 = 0") @@ -88,13 +97,12 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testFilter1.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } - @Ignore @Test public void testJoinWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input1 = env.fromElements( @@ -102,14 +110,14 @@ public class SqlExplainITCase { new WC(1, "d"), new WC(1, "d")); - Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + Table table1 = tableEnv.fromDataSet(input1, "count as a, word as b"); DataSet<WC> input2 = env.fromElements( new WC(1,"d"), new WC(1,"d"), new WC(1,"d")); - Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + Table table2 = tableEnv.fromDataSet(input2, "count as c, word as d"); String result = table1 .join(table2) @@ -119,13 +127,12 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testJoin0.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } - @Ignore @Test public void testJoinWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input1 = env.fromElements( @@ -133,14 +140,14 @@ public class SqlExplainITCase { new WC(1, "d"), new WC(1, "d")); - Table table1 = tableEnv.fromDataSet(input1).as("a, b"); + Table table1 = tableEnv.fromDataSet(input1, "count as a, word as b"); DataSet<WC> input2 = env.fromElements( new WC(1, "d"), new WC(1, "d"), new WC(1, "d")); - Table table2 = tableEnv.fromDataSet(input2).as("c, d"); + Table table2 = tableEnv.fromDataSet(input2, "count as c, word as d"); String result = table1 .join(table2) @@ -150,13 +157,12 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testJoin1.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } - @Ignore @Test public void testUnionWithoutExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input1 = env.fromElements( @@ -179,13 +185,12 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testUnion0.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } - @Ignore @Test public void testUnionWithExtended() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); DataSet<WC> input1 = env.fromElements( @@ -208,6 +213,6 @@ public class SqlExplainITCase { String source = new Scanner(new File(testFilePath + "../../src/test/scala/resources/testUnion1.out")) .useDelimiter("\\A").next(); - assertEquals(result, source); + assertEquals(source, result); } } http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala deleted file mode 100644 index 954970f..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -import org.junit._ -import org.junit.Assert.assertEquals - -case class WC(count: Int, word: String) - -class SqlExplainITCase { - - val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile - - @Ignore - @Test - def testGroupByWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) - val result = expr.filter("a % 2 = 0").explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter0.out").mkString - assertEquals(result, source) - } - - @Ignore - @Test - def testGroupByWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a, 'b) - val result = expr.filter("a % 2 = 0").explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter1.out").mkString - assertEquals(result, source) - } - - @Ignore - @Test - def testJoinWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) - val result = expr1.join(expr2).where("b = d").select("a, c").explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin0.out").mkString - assertEquals(result, source) - } - - @Ignore - @Test - def testJoinWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a, 'b) - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c, 'd) - val result = expr1.join(expr2).where("b = d").select("a, c").explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin1.out").mkString - assertEquals(result, source) - } - - @Ignore - @Test - def testUnionWithoutExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable - val result = expr1.unionAll(expr2).explain() - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion0.out").mkString - assertEquals(result, source) - } - - @Ignore - @Test - def testUnionWithExtended() : Unit = { - val env = ExecutionEnvironment.createLocalEnvironment() - val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable - val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable - val result = expr1.unionAll(expr2).explain(true) - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion1.out").mkString - assertEquals(result, source) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala new file mode 100644 index 0000000..de07b24 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.test.util.MultipleProgramsTestBase + +import org.junit._ +import org.junit.Assert.assertEquals + +case class WC(count: Int, word: String) + +class SqlExplainTest + extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) { + + val testFilePath = SqlExplainTest.this.getClass.getResource("/").getFile + + @Before + def resetContext(): Unit = { + TranslationContext.reset() + } + + @Test + def testFilterWithoutExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")) + .as('count as 'a, 'word as 'b) + val result = expr.filter("a % 2 = 0").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter0.out").mkString + assertEquals(result, source) + } + + @Test + def testFilterWithExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")) + .as('count as 'a, 'word as 'b) + val result = expr.filter("a % 2 = 0").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilter1.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithoutExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")) + .as('count as 'a, 'word as 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")) + .as('count as 'c, 'word as 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin0.out").mkString + assertEquals(result, source) + } + + @Test + def testJoinWithExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")) + .as('count as 'a, 'word as 'b) + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")) + .as('count as 'c, 'word as 'd) + val result = expr1.join(expr2).where("b = d").select("a, c").explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testJoin1.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithoutExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain() + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion0.out").mkString + assertEquals(result, source) + } + + @Test + def testUnionWithExtended() : Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable + val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable + val result = expr1.unionAll(expr2).explain(true) + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnion1.out").mkString + assertEquals(result, source) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out index 062fc90..1d0198d 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out @@ -1,5 +1,6 @@ == Abstract Syntax Tree == -Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) +LogicalFilter(condition=[=(MOD($0, 2), 0)]) + LogicalTableScan(table=[[DataSetTable_0]]) == Physical Execution Plan == Stage 3 : Data Source @@ -7,14 +8,14 @@ Stage 3 : Data Source Partitioning : RANDOM_PARTITIONED Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED - Stage 1 : Filter - content : ('a * 2) === 0 + Stage 1 : FlatMap + content : where: (=(MOD(a, 2), 0)), select: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out index 83378e6..ea76faa 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out @@ -1,5 +1,6 @@ == Abstract Syntax Tree == -Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0) +LogicalFilter(condition=[=(MOD($0, 2), 0)]) + LogicalTableScan(table=[[DataSetTable_0]]) == Physical Execution Plan == Stage 3 : Data Source @@ -24,7 +25,7 @@ Stage 3 : Data Source Filter Factor : (none) Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map @@ -47,8 +48,8 @@ Stage 3 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) - Stage 1 : Filter - content : ('a * 2) === 0 + Stage 1 : FlatMap + content : where: (=(MOD(a, 2), 0)), select: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap @@ -58,8 +59,8 @@ Stage 3 : Data Source Order : (none) Grouping : not grouped Uniqueness : not unique - Est. Output Size : 0.0 - Est. Cardinality : 0.0 + Est. Output Size : (unknown) + Est. Cardinality : (unknown) Network : 0.0 Disk I/O : 0.0 CPU : 0.0 @@ -81,8 +82,8 @@ Stage 3 : Data Source Order : (none) Grouping : not grouped Uniqueness : not unique - Est. Output Size : 0.0 - Est. Cardinality : 0.0 + Est. Output Size : (unknown) + Est. Cardinality : (unknown) Network : 0.0 Disk I/O : 0.0 CPU : 0.0 http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out index e6e30be..85b815d 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out @@ -1,39 +1,50 @@ == Abstract Syntax Tree == -Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) +LogicalProject(a=[$0], c=[$2]) + LogicalFilter(condition=[=($1, $3)]) + LogicalJoin(condition=[true], joinType=[inner]) + LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[DataSetTable_1]]) == Physical Execution Plan == -Stage 3 : Data Source +Stage 4 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + Stage 3 : Map + content : from: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED -Stage 5 : Data Source +Stage 6 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) + Stage 5 : Map + content : from: (c, d) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED - Stage 1 : Join - content : Join at 'b === 'd + Stage 2 : Join + content : where: (=(b, d)), join: (a, b, c, d) ship_strategy : Hash Partition on [1] exchange_mode : PIPELINED - driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + driver_strategy : Hybrid Hash (build: from: (a, b)) Partitioning : RANDOM_PARTITIONED - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat + Stage 1 : FlatMap + content : select: (a, c AS b) ship_strategy : Forward exchange_mode : PIPELINED + driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out index a8f05dd..e88da82 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out @@ -1,8 +1,12 @@ == Abstract Syntax Tree == -Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer), (word,String))), c,d)), 'b === 'd), 'a,'c) +LogicalProject(a=[$0], c=[$2]) + LogicalFilter(condition=[=($1, $3)]) + LogicalJoin(condition=[true], joinType=[inner]) + LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[DataSetTable_1]]) == Physical Execution Plan == -Stage 3 : Data Source +Stage 4 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Partitioning Order : (none) @@ -23,8 +27,8 @@ Stage 3 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) - Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + Stage 3 : Map + content : from: (a, b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map @@ -47,7 +51,7 @@ Stage 3 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) -Stage 5 : Data Source +Stage 6 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Partitioning Order : (none) @@ -68,8 +72,8 @@ Stage 5 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) - Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) + Stage 5 : Map + content : from: (c, d) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map @@ -92,11 +96,11 @@ Stage 5 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) - Stage 1 : Join - content : Join at 'b === 'd + Stage 2 : Join + content : where: (=(b, d)), join: (a, b, c, d) ship_strategy : Hash Partition on [1] exchange_mode : PIPELINED - driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word)) + driver_strategy : Hybrid Hash (build: from: (a, b)) Partitioning : RANDOM_PARTITIONED Partitioning Order : (none) Uniqueness : not unique @@ -116,10 +120,11 @@ Stage 5 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) - Stage 0 : Data Sink - content : org.apache.flink.api.java.io.DiscardingOutputFormat + Stage 1 : FlatMap + content : select: (a, c AS b) ship_strategy : Forward exchange_mode : PIPELINED + driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Partitioning Order : (none) Uniqueness : not unique @@ -139,3 +144,26 @@ Stage 5 : Data Source Avg. Output Record Size (bytes) : (none) Filter Factor : (none) + Stage 0 : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + Partitioning Order : (none) + Uniqueness : not unique + Order : (none) + Grouping : not grouped + Uniqueness : not unique + Est. Output Size : (unknown) + Est. Cardinality : (unknown) + Network : 0.0 + Disk I/O : 0.0 + CPU : 0.0 + Cumulative Network : (unknown) + Cumulative Disk I/O : (unknown) + Cumulative CPU : (unknown) + Output Size (bytes) : (none) + Output Cardinality : (none) + Avg. Output Record Size (bytes) : (none) + Filter Factor : (none) + http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out index db9d2f9..8e892c6 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out @@ -1,5 +1,7 @@ == Abstract Syntax Tree == -Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) +LogicalUnion(all=[true]) + LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[DataSetTable_1]]) == Physical Execution Plan == Stage 3 : Data Source @@ -7,7 +9,7 @@ Stage 3 : Data Source Partitioning : RANDOM_PARTITIONED Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (count, word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map @@ -18,7 +20,7 @@ Stage 5 : Data Source Partitioning : RANDOM_PARTITIONED Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (count, word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map http://git-wip-us.apache.org/repos/asf/flink/blob/89d48f5c/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out index 8dc1e53..34892b1 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out @@ -1,5 +1,7 @@ == Abstract Syntax Tree == -Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String)))) +LogicalUnion(all=[true]) + LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[DataSetTable_1]]) == Physical Execution Plan == Stage 3 : Data Source @@ -24,7 +26,7 @@ Stage 3 : Data Source Filter Factor : (none) Stage 2 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (count, word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map @@ -69,7 +71,7 @@ Stage 5 : Data Source Filter Factor : (none) Stage 4 : Map - content : Map at select('count as 'count,'word as 'word) + content : from: (count, word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map