Repository: beam Updated Branches: refs/heads/DSL_SQL f96f9f680 -> 25babc999
[BEAM-2288] Refine DSL interface as design doc of BEAM-2010: 1. rename BeamSqlRunner to BeamSQLEnvironment; 2. Move pipeline as external parameter, to integrate with none-DSL components; Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b3f7a2b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b3f7a2b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b3f7a2b Branch: refs/heads/DSL_SQL Commit: 7b3f7a2b5bb5017819e8c8c53620d3cf97a38008 Parents: f96f9f6 Author: mingmxu <ming...@ebay.com> Authored: Sun May 14 11:02:37 2017 -0700 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Wed May 17 21:03:13 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/BeamSQLEnvironment.java | 142 +++++++++++++++++++ .../beam/dsls/sql/example/BeamSqlExample.java | 23 ++- .../dsls/sql/planner/BeamPipelineCreator.java | 9 +- .../beam/dsls/sql/planner/BeamQueryPlanner.java | 34 ++--- .../beam/dsls/sql/planner/BeamSqlRunner.java | 103 -------------- .../beam/dsls/sql/planner/BasePlanner.java | 9 +- .../sql/planner/BeamGroupByExplainTest.java | 14 +- .../sql/planner/BeamGroupByPipelineTest.java | 19 +-- .../sql/planner/BeamInvalidGroupByTest.java | 10 +- .../BeamPlannerAggregationSubmitTest.java | 17 ++- .../sql/planner/BeamPlannerExplainTest.java | 7 +- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 11 +- .../beam/dsls/sql/rel/BeamSortRelTest.java | 40 ++++-- 13 files changed, 251 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java new file mode 100644 index 0000000..cdb25f5 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import java.io.Serializable; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code BeamSQLEnvironment} is the integrated environment of BeamSQL. + * It provides runtime context to execute SQL queries as Beam pipeline, + * including table metadata, SQL engine and a Beam pipeline translator. + * + * <h1>1. BeamSQL as DSL</h1> + * <em>BeamSQL as DSL</em> enables developers to embed SQL queries when writing a Beam pipeline. + * A typical pipeline with BeamSQL DSL is: + * <pre> + *{@code +PipelineOptions options = PipelineOptionsFactory... +Pipeline pipeline = Pipeline.create(options); + +//prepare environment of BeamSQL +BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create(); +//register table metadata +sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata); +//register UDF +sqlEnv.registerUDF(String functionName, Method udfMethod); + + +//explain a SQL statement, SELECT only, and return as a PCollection; +PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(pipeline, String sqlStatement); +//A PCollection explained by BeamSQL can be converted into a table, and apply queries on it; +sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream); + +//apply more queries, even based on phase1Stream + +pipeline.run().waitUntilFinish(); + * } + * </pre> + * + * <h1>2. BeamSQL as CLI</h1> + * This feature is on planning, and not ready yet. + * + */ +public class BeamSQLEnvironment implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(BeamSQLEnvironment.class); + + public static final BeamSQLEnvironment INSTANCE = new BeamSQLEnvironment(); + + private SchemaPlus schema = Frameworks.createRootSchema(true); + private BeamQueryPlanner planner = new BeamQueryPlanner(schema); + + private BeamSQLEnvironment() { + //disable assertions in Calcite. + ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); + } + + /** + * Return an instance of {@code BeamSQLEnvironment}. + */ + public static BeamSQLEnvironment create(){ + return INSTANCE; + } + + /** + * Add a schema. + * + */ + public void addSchema(String schemaName, Schema scheme) { + schema.add(schemaName, schema); + } + + /** + * add a {@link BaseBeamTable} to schema repository. + */ + public void addTableMetadata(String tableName, BaseBeamTable tableMetadata) { + schema.add(tableName, tableMetadata); + planner.getSourceTables().put(tableName, tableMetadata); + } + + /* Add a UDF function. + * + * <p>There're two requirements for function {@code methodName}:<br> + * 1. It must be a STATIC method;<br> + * 2. For a primitive parameter, use its wrapper class and handle NULL properly; + */ + public void addUDFFunction(String functionName, Class<?> className, String methodName){ + schema.add(functionName, ScalarFunctionImpl.create(className, methodName)); + } + + /** + * explain and display the execution plan. + */ + public String executionPlan(String sqlString) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode exeTree = planner.convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + LOG.info(String.format("beamPlan>\n%s", beamPlan)); + return beamPlan; + } + + /** + * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, + * which is linked with the given {@code pipeline}. The final output stream is returned as + * {@code PCollection} so more operations can be applied. + */ + public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline) + throws Exception{ + PCollection<BeamSQLRow> resultStream = planner.compileBeamPipeline(sqlStatement, basePipeline); + return resultStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 303835f..2695944 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -21,9 +21,14 @@ import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.beam.dsls.sql.planner.BeamSqlRunner; +import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; @@ -67,17 +72,23 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; public class BeamSqlExample implements Serializable { public static void main(String[] args) throws Exception { - BeamSqlRunner runner = new BeamSqlRunner(); - runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); - runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); // FlinkPipelineOptions.class + options.setJobName("BeamSqlExample"); + Pipeline pipeline = Pipeline.create(options); + + BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + runner.addTableMetadata("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); + runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); // case 2: insert into <table>(<fields>) select STREAM <fields> from // <table> from <clause> String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - runner.explainQuery(sql); - runner.submitQuery(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + + pipeline.run().waitUntilFinish(); } public static BaseBeamTable getTable(String bootstrapServer, String topic) { http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java index 98ccb57..1f3ba58 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -28,7 +28,6 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; /** * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam @@ -44,14 +43,10 @@ public class BeamPipelineCreator { private boolean hasPersistent = false; - public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) { + public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline pipeline) { this.sourceTables = sourceTables; + this.pipeline = pipeline; - options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() - .as(PipelineOptions.class); // FlinkPipelineOptions.class - options.setJobName("BeamPlanCreator"); - - pipeline = Pipeline.create(options); CoderRegistry cr = pipeline.getCoderRegistry(); cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 9e41555..0a7407c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -26,9 +26,9 @@ import java.util.Map; import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; - +import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.CalciteSchema; @@ -58,8 +58,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The core component to handle through a SQL statement, to submit a Beam - * pipeline. + * The core component to handle through a SQL statement, from explain execution plan, + * to generate a Beam pipeline. * */ public class BeamQueryPlanner { @@ -95,29 +95,17 @@ public class BeamQueryPlanner { } /** - * With a Beam pipeline generated in {@link #compileBeamPipeline(String)}, - * submit it to run and wait until finish. - * - */ - public void submitToRun(String sqlStatement) throws Exception { - Pipeline pipeline = compileBeamPipeline(sqlStatement); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - } - - /** - * With the @{@link BeamRelNode} tree generated in - * {@link #convertToBeamRel(String)}, a Beam pipeline is generated. - * + * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, + * which is linked with the given {@code pipeline}. The final output stream is returned as + * {@code PCollection} so more operations can be applied. */ - public Pipeline compileBeamPipeline(String sqlStatement) throws Exception { + public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline pipeline) + throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); - BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); - relNode.buildBeamPipeline(planCreator); + BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, pipeline); - return planCreator.getPipeline(); + return relNode.buildBeamPipeline(planCreator); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java deleted file mode 100644 index 95ba5a9..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import java.io.Serializable; - -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Interface to explain, submit a SQL query. - * - */ -public class BeamSqlRunner implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class); - - private SchemaPlus schema = Frameworks.createRootSchema(true); - - private BeamQueryPlanner planner = new BeamQueryPlanner(schema); - - public BeamSqlRunner() { - //disable assertions in Calcite. - ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); - } - - /** - * Add a schema. - * - */ - public void addSchema(String schemaName, Schema scheme) { - schema.add(schemaName, schema); - } - - /** - * add a {@link BaseBeamTable} to schema repository. - * - */ - public void addTable(String tableName, BaseBeamTable table) { - schema.add(tableName, table); - planner.getSourceTables().put(tableName, table); - } - - /** - * Add a UDF function. - * - * <p>There're two requirements for function {@code methodName}:<br> - * 1. It must be a STATIC method;<br> - * 2. For a primitive parameter, use its wrapper class and handle NULL properly; - */ - public void addUDFFunction(String functionName, Class<?> className, String methodName){ - schema.add(functionName, ScalarFunctionImpl.create(className, methodName)); - } - - /** - * submit as a Beam pipeline. - * - */ - public void submitQuery(String sqlString) throws Exception { - planner.submitToRun(sqlString); - } - - /** - * explain and display the execution plan. - * - */ - public String explainQuery(String sqlString) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); - String beamPlan = RelOptUtil.toString(exeTree); - System.out.println(String.format("beamPlan>\n%s", beamPlan)); - return beamPlan; - } - - protected BeamQueryPlanner getPlanner() { - return planner; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java index 055a4d4..fe8a236 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -37,13 +38,13 @@ import org.junit.BeforeClass; * */ public class BasePlanner { - public static BeamSqlRunner runner = new BeamSqlRunner(); + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @BeforeClass public static void prepare() { - runner.addTable("ORDER_DETAILS", getTable()); - runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - runner.addTable("SUB_ORDER_RAM", getTable()); + runner.addTableMetadata("ORDER_DETAILS", getTable()); + runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + runner.addTableMetadata("SUB_ORDER_RAM", getTable()); } private static BaseBeamTable getTable() { http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java index 4f2b1ba..98d14c3 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java @@ -33,7 +33,7 @@ public class BeamGroupByExplainTest extends BasePlanner { public void testSimpleGroupExplain() throws Exception { String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -43,7 +43,7 @@ public class BeamGroupByExplainTest extends BasePlanner { public void testSimpleGroup2Explain() throws Exception { String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -54,7 +54,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -66,7 +66,7 @@ public class BeamGroupByExplainTest extends BasePlanner { + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -77,7 +77,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -88,7 +88,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } /** @@ -99,6 +99,6 @@ public class BeamGroupByExplainTest extends BasePlanner { runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java index 71dcf73..5101c98 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java @@ -18,7 +18,9 @@ package org.apache.beam.dsls.sql.planner; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; import org.junit.Test; /** @@ -26,6 +28,7 @@ import org.junit.Test; * */ public class BeamGroupByPipelineTest extends BasePlanner { + public final TestPipeline pipeline = TestPipeline.create(); /** * GROUP-BY without window operation, and grouped fields. @@ -34,7 +37,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { public void testSimpleGroupExplain() throws Exception { String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -44,7 +47,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { public void testSimpleGroup2Explain() throws Exception { String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -55,7 +58,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -67,7 +70,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -78,7 +81,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -89,7 +92,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } /** @@ -100,7 +103,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java index 83ab871..72b5bf7 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java @@ -17,7 +17,11 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.tools.ValidationException; +import org.junit.Rule; import org.junit.Test; /** @@ -25,12 +29,14 @@ import org.junit.Test; * */ public class BeamInvalidGroupByTest extends BasePlanner { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); @Test(expected = ValidationException.class) public void testTumble2Explain() throws Exception { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - String plan = runner.explainQuery(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } @Test(expected = ValidationException.class) @@ -38,7 +44,7 @@ public class BeamInvalidGroupByTest extends BasePlanner { String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - String plan = runner.explainQuery(sql); + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java index 39a5614..22f1848 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java @@ -21,16 +21,18 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; +import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; /** @@ -39,12 +41,15 @@ import org.junit.Test; */ public class BeamPlannerAggregationSubmitTest { public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public static BeamSqlRunner runner = new BeamSqlRunner(); + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); @BeforeClass public static void prepare() throws ParseException { - runner.addTable("ORDER_DETAILS", getOrderTable()); - runner.addTable("ORDER_SUMMARY", getSummaryTable()); + runner.addTableMetadata("ORDER_DETAILS", getOrderTable()); + runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable()); } private static BaseBeamTable getOrderTable() throws ParseException { @@ -108,7 +113,7 @@ public class BeamPlannerAggregationSubmitTest { + "WHERE SITE_ID = 1 " + "GROUP BY site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + runner.compileBeamPipeline(sql, pipeline); pipeline.run().waitUntilFinish(); @@ -125,7 +130,7 @@ public class BeamPlannerAggregationSubmitTest { + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + runner.compileBeamPipeline(sql, pipeline); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java index 9b6b20a..1355d5d 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java @@ -25,11 +25,10 @@ import org.junit.Test; * */ public class BeamPlannerExplainTest extends BasePlanner { - @Test public void selectAll() throws Exception { String sql = "SELECT * FROM ORDER_DETAILS"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" @@ -41,7 +40,7 @@ public class BeamPlannerExplainTest extends BasePlanner { public void selectWithFilter() throws Exception { String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" @@ -54,7 +53,7 @@ public class BeamPlannerExplainTest extends BasePlanner { String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.explainQuery(sql); + String plan = runner.executionPlan(sql); String expectedPlan = "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java index 5435049..17cea27 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -17,8 +17,11 @@ */ package org.apache.beam.dsls.sql.planner; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; /** @@ -26,12 +29,16 @@ import org.junit.Test; * */ public class BeamPlannerSubmitTest extends BasePlanner { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + @Test public void insertSelectFilter() throws Exception { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + + PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index 11cec51..864d4b7 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -20,20 +20,24 @@ package org.apache.beam.dsls.sql.rel; import java.util.Date; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSQLEnvironment; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; -import org.apache.beam.dsls.sql.planner.BeamSqlRunner; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; /** * Test for {@code BeamSortRel}. */ public class BeamSortRelTest { - public static BeamSqlRunner runner = new BeamSqlRunner(); + public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static MockedBeamSQLTable subOrderRamTable = MockedBeamSQLTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", @@ -65,7 +69,8 @@ public class BeamSortRelTest { + "ORDER BY order_id asc, site_id desc limit 4"; System.out.println(sql); - runner.submitQuery(sql); + runner.compileBeamPipeline(sql, pipeline); + pipeline.run().waitUntilFinish(); assertEquals( MockedBeamSQLTable.of( @@ -81,7 +86,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsFirst() throws Exception { - runner.addTable("ORDER_DETAILS", MockedBeamSQLTable + runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -91,7 +96,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable + runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -101,7 +106,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - runner.submitQuery(sql); + runner.compileBeamPipeline(sql, pipeline); + pipeline.run().waitUntilFinish(); assertEquals( MockedBeamSQLTable.of( @@ -118,7 +124,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsLast() throws Exception { - runner.addTable("ORDER_DETAILS", MockedBeamSQLTable + runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -128,7 +134,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable + runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -138,7 +144,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - runner.submitQuery(sql); + runner.compileBeamPipeline(sql, pipeline); + pipeline.run().waitUntilFinish(); assertEquals( MockedBeamSQLTable.of( @@ -161,7 +168,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - runner.submitQuery(sql); + runner.compileBeamPipeline(sql, pipeline); + pipeline.run().waitUntilFinish(); assertEquals( MockedBeamSQLTable.of( @@ -184,7 +192,8 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - runner.submitQuery(sql); + runner.compileBeamPipeline(sql, pipeline); + pipeline.run().waitUntilFinish(); assertEquals( MockedBeamSQLTable.of( @@ -214,12 +223,13 @@ public class BeamSortRelTest { + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + "ORDER BY order_id asc limit 11"; - runner.submitQuery(sql); + TestPipeline pipeline = TestPipeline.create(); + runner.compileBeamPipeline(sql, pipeline); } public static void prepare() { - runner.addTable("ORDER_DETAILS", orderDetailTable); - runner.addTable("SUB_ORDER_RAM", subOrderRamTable); + runner.addTableMetadata("ORDER_DETAILS", orderDetailTable); + runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable); } private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) {