Repository: flink Updated Branches: refs/heads/master 5fdc72069 -> 545b72bee
[FLINK-4599] [table] Add 'explain()' also to StreamTableEnvironment This closes #2485. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/545b72be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/545b72be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/545b72be Branch: refs/heads/master Commit: 545b72bee9b2297c9d1d2f5d59d6d839378fde92 Parents: 5fdc720 Author: chobeat <simone.robu...@gmail.com> Authored: Fri Sep 9 11:27:41 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Thu Sep 15 17:45:30 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 53 ++++++++++++++++ .../api/table/StreamTableEnvironment.scala | 16 +++++ .../api/scala/stream/ExplainStreamTest.scala | 64 ++++++++++++++++++++ .../test/scala/resources/testFilterStream0.out | 3 + .../test/scala/resources/testUnionStream0.out | 4 ++ 5 files changed, 140 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index fe3ddc3..b88a7da 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -2501,3 +2501,56 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. {% top %} + +Explaining a Table +---- +The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and Flink's Execution Plan of the Job. + +Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan. + +The following code shows an example and the corresponding output: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello")); +DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello")); + +Table table1 = tEnv.fromDataStream(stream1, "count, word"); +Table table2 = tEnv.fromDataStream(stream2, "count, word"); +Table table = table1.unionAll(table2); + +String explanation = tEnv.explain(table); +System.out.println(explanation); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) +val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) +val table = table1.unionAll(table2) + +val explanation: String = tEnv.explain(table) +println(explanation) +{% endhighlight %} +</div> +</div> + +{% highlight text %} +== Abstract Syntax Tree == +LogicalUnion(all=[true]) + LogicalTableScan(table=[[_DataStreamTable_0]]) + LogicalTableScan(table=[[_DataStreamTable_1]]) +{% endhighlight %} + +{% top %} + + + http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 4f57ae9..f73cd3f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -272,4 +272,20 @@ abstract class StreamTableEnvironment( } + /** + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute + * the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. + */ + def explain(table: Table): String = { + + val ast = RelOptUtil.toString(table.getRelNode) + + s"== Abstract Syntax Tree ==" + + System.lineSeparator + + ast + + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala new file mode 100644 index 0000000..71500f1 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert.assertEquals +import org.junit._ + +class ExplainStreamTest + extends StreamingMultipleProgramsTestBase { + + val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile + + @Test + def testFilter(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements((1, "hello")) + .toTable(tEnv, 'a, 'b) + .filter("a % 2 = 0") + + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") + assertEquals(result, source) + } + + @Test + def testUnion(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table = table1.unionAll(table2) + + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") + assertEquals(result, source) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out new file mode 100644 index 0000000..3fda6de --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out @@ -0,0 +1,3 @@ +== Abstract Syntax Tree == +LogicalFilter(condition=[=(MOD($0, 2), 0)]) + LogicalTableScan(table=[[_DataStreamTable_0]]) http://git-wip-us.apache.org/repos/asf/flink/blob/545b72be/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out new file mode 100644 index 0000000..b2e3000 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -0,0 +1,4 @@ +== Abstract Syntax Tree == +LogicalUnion(all=[true]) + LogicalTableScan(table=[[_DataStreamTable_0]]) + LogicalTableScan(table=[[_DataStreamTable_1]])