[FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().
This closes #2720. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d60fe723 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d60fe723 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d60fe723 Branch: refs/heads/master Commit: d60fe723aa357733c6ad8715b0e8c4e55ab7f52d Parents: ed6a602 Author: anton solovev <[email protected]> Authored: Tue Oct 25 15:55:42 2016 +0400 Committer: Fabian Hueske <[email protected]> Committed: Wed Nov 2 18:30:18 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/explain/PlanJsonParser.java | 15 ++++++++----- .../api/table/StreamTableEnvironment.scala | 18 +++++++++++++--- .../api/scala/stream/ExplainStreamTest.scala | 22 ++++++++++++++------ .../test/scala/resources/testFilterStream0.out | 13 ++++++++++++ .../test/scala/resources/testUnionStream0.out | 16 ++++++++++++++ 5 files changed, 70 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java index 3c4d3d9..bd14cd2 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java @@ -62,7 +62,7 @@ public class PlanJsonParser { if (dele > -1) { content = tempNode.getContents().substring(0, dele); } - + //replace with certain content if node is dataSource to pass //unit tests, because java and scala use different api to //get input element @@ -76,8 +76,11 @@ public class PlanJsonParser { printTab(tabCount + 1, pw); pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); - printTab(tabCount + 1, pw); - pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n"); + String mode = predecessors.get(0).getExchange_mode(); + if (mode != null) { + printTab(tabCount + 1, pw); + pw.print("exchange_mode : " + mode + "\n"); + } } if (tempNode.getDriver_strategy() != null) { @@ -85,9 +88,11 @@ public class PlanJsonParser { pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); } - printTab(tabCount + 1, pw); - pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + if (tempNode.getGlobal_properties() != null) { + printTab(tabCount + 1, pw); + pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + tempNode.getGlobal_properties().get(0).getValue() + "\n"); + } if (extended) { List<Global_properties> globalProperties = tempNode.getGlobal_properties(); http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index b9e889d..bca8d79 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} @@ -311,14 +313,24 @@ abstract class StreamTableEnvironment( * * @param table The table for which the AST and execution plan will be returned. */ - def explain(table: Table): String = { + def explain(table: Table): String = { val ast = RelOptUtil.toString(table.getRelNode) + val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) + + val env = dataStream.getExecutionEnvironment + val jsonSqlPlan = env.getExecutionPlan + + val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false) + s"== Abstract Syntax Tree ==" + System.lineSeparator + - ast - + s"$ast" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + + s"$sqlPlan" } } http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala index 71500f1..5eebb34 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -40,10 +40,12 @@ class ExplainStreamTest .toTable(tEnv, 'a, 'b) .filter("a % 2 = 0") - val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val result = replaceString(tEnv.explain(table)) + val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + "../../src/test/scala/resources/testFilterStream0.out").mkString + val expect = replaceString(source) + assertEquals(result, expect) } @Test @@ -55,10 +57,18 @@ class ExplainStreamTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val result = replaceString(tEnv.explain(table)) + val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + "../../src/test/scala/resources/testUnionStream0.out").mkString + val expect = replaceString(source) + assertEquals(result, expect) } + def replaceString(s: String): String = { + /* Stage {id} is ignored, because id keeps incrementing in test class + * while StreamExecutionEnvironment is up + */ + s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out index 3fda6de..20ae2b1 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out @@ -1,3 +1,16 @@ == Abstract Syntax Tree == LogicalFilter(condition=[=(MOD($0, 2), 0)]) LogicalTableScan(table=[[_DataStreamTable_0]]) + +== Physical Execution Plan == +Stage 1 : Data Source + content : collect elements with CollectionInputFormat + + Stage 2 : Operator + content : from: (a, b) + ship_strategy : REBALANCE + + Stage 3 : Operator + content : where: (=(MOD(a, 2), 0)), select: (a, b) + ship_strategy : FORWARD + http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out index b2e3000..ac3635d 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -2,3 +2,19 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableScan(table=[[_DataStreamTable_1]]) + +== Physical Execution Plan == +Stage 1 : Data Source + content : collect elements with CollectionInputFormat + +Stage 2 : Data Source + content : collect elements with CollectionInputFormat + + Stage 3 : Operator + content : from: (count, word) + ship_strategy : REBALANCE + + Stage 4 : Operator + content : from: (count, word) + ship_strategy : REBALANCE +
