DSL interface for Beam SQL
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/680a543d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/680a543d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/680a543d Branch: refs/heads/DSL_SQL Commit: 680a543d20d0d8bca25aced0a5f02c38529babf2 Parents: dedabff Author: mingmxu <[email protected]> Authored: Fri May 26 22:07:03 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon Jun 5 16:22:08 2017 -0700 ---------------------------------------------------------------------- .../beam/dsls/sql/BeamSQLEnvironment.java | 142 ---------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 166 +++++++++++++++++++ .../org/apache/beam/dsls/sql/BeamSqlCli.java | 70 ++++++++ .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 63 +++++++ .../beam/dsls/sql/example/BeamSqlExample.java | 106 +++++------- .../dsls/sql/planner/BeamPipelineCreator.java | 17 +- .../beam/dsls/sql/planner/BeamQueryPlanner.java | 21 ++- .../beam/dsls/sql/rel/BeamAggregationRel.java | 57 +++++-- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 12 +- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 17 +- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 25 ++- .../beam/dsls/sql/rel/BeamProjectRel.java | 11 +- .../apache/beam/dsls/sql/rel/BeamRelNode.java | 11 +- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 19 ++- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 10 +- .../beam/dsls/sql/schema/BeamSQLRecordType.java | 22 +++ .../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 87 ---------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 34 ++-- .../beam/dsls/sql/planner/BasePlanner.java | 10 +- .../sql/planner/BeamGroupByExplainTest.java | 18 +- .../sql/planner/BeamGroupByPipelineTest.java | 18 +- .../sql/planner/BeamInvalidGroupByTest.java | 5 +- .../BeamPlannerAggregationSubmitTest.java | 12 +- .../sql/planner/BeamPlannerExplainTest.java | 7 +- .../dsls/sql/planner/BeamPlannerSubmitTest.java | 3 +- .../beam/dsls/sql/rel/BeamSortRelTest.java | 28 ++-- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 14 +- .../sql/schema/BeamPCollectionTableTest.java | 18 +- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +- .../transform/BeamAggregationTransformTest.java | 98 ++++++----- 30 files changed, 628 insertions(+), 495 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java deleted file mode 100644 index cdb25f5..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.io.Serializable; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code BeamSQLEnvironment} is the integrated environment of BeamSQL. - * It provides runtime context to execute SQL queries as Beam pipeline, - * including table metadata, SQL engine and a Beam pipeline translator. - * - * <h1>1. BeamSQL as DSL</h1> - * <em>BeamSQL as DSL</em> enables developers to embed SQL queries when writing a Beam pipeline. - * A typical pipeline with BeamSQL DSL is: - * <pre> - *{@code -PipelineOptions options = PipelineOptionsFactory... -Pipeline pipeline = Pipeline.create(options); - -//prepare environment of BeamSQL -BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create(); -//register table metadata -sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata); -//register UDF -sqlEnv.registerUDF(String functionName, Method udfMethod); - - -//explain a SQL statement, SELECT only, and return as a PCollection; -PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(pipeline, String sqlStatement); -//A PCollection explained by BeamSQL can be converted into a table, and apply queries on it; -sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream); - -//apply more queries, even based on phase1Stream - -pipeline.run().waitUntilFinish(); - * } - * </pre> - * - * <h1>2. BeamSQL as CLI</h1> - * This feature is on planning, and not ready yet. - * - */ -public class BeamSQLEnvironment implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(BeamSQLEnvironment.class); - - public static final BeamSQLEnvironment INSTANCE = new BeamSQLEnvironment(); - - private SchemaPlus schema = Frameworks.createRootSchema(true); - private BeamQueryPlanner planner = new BeamQueryPlanner(schema); - - private BeamSQLEnvironment() { - //disable assertions in Calcite. - ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); - } - - /** - * Return an instance of {@code BeamSQLEnvironment}. - */ - public static BeamSQLEnvironment create(){ - return INSTANCE; - } - - /** - * Add a schema. - * - */ - public void addSchema(String schemaName, Schema scheme) { - schema.add(schemaName, schema); - } - - /** - * add a {@link BaseBeamTable} to schema repository. - */ - public void addTableMetadata(String tableName, BaseBeamTable tableMetadata) { - schema.add(tableName, tableMetadata); - planner.getSourceTables().put(tableName, tableMetadata); - } - - /* Add a UDF function. - * - * <p>There're two requirements for function {@code methodName}:<br> - * 1. It must be a STATIC method;<br> - * 2. For a primitive parameter, use its wrapper class and handle NULL properly; - */ - public void addUDFFunction(String functionName, Class<?> className, String methodName){ - schema.add(functionName, ScalarFunctionImpl.create(className, methodName)); - } - - /** - * explain and display the execution plan. - */ - public String executionPlan(String sqlString) - throws ValidationException, RelConversionException, SqlParseException { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); - String beamPlan = RelOptUtil.toString(exeTree); - LOG.info(String.format("beamPlan>\n%s", beamPlan)); - return beamPlan; - } - - /** - * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, - * which is linked with the given {@code pipeline}. The final output stream is returned as - * {@code PCollection} so more operations can be applied. - */ - public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline) - throws Exception{ - PCollection<BeamSQLRow> resultStream = planner.compileBeamPipeline(sqlStatement, basePipeline); - return resultStream; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java new file mode 100644 index 0000000..8c2c5ad --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; + +/** + * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a + * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline. + * + * <h1>Beam SQL DSL usage:</h1> + * A typical pipeline with Beam SQL DSL is: + * <pre> + *{@code +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); + +//create table from TextIO; +TableSchema tableASchema = ...; +PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha")) + .apply(BeamSql.fromTextRow(tableASchema)); +TableSchema tableBSchema = ...; +PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) + .apply(BeamSql.fromTextRow(tableBSchema)); + +//run a simple query, and register the output as a table in BeamSql; +String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; +PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)) + .withUdf("MY_FUNC", myFunc); + +//run a JOIN with one table from TextIO, and one table from another query +PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( + new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA) + .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB) + .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ...")); + +//output the final result with TextIO +outputTableB.apply(BeamSql.toTextRow()).apply(TextIO.write().to("/my/output/path")); + +p.run().waitUntilFinish(); + * } + * </pre> + */ +@Experimental +public class BeamSql { + /** + * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. + * + * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing + * all the input tables and results in a {@code PCollection<BeamSQLRow>} representing the output + * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to + * {@code PCollection<BeamSQLRow>}, each representing an input table. + * + * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names} + * referenced within the query. + */ + public static PTransform<PCollectionTuple, PCollection<BeamSQLRow>> query(String sqlQuery) { + return new QueryTransform(sqlQuery); + + } + + /** + * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. + * + * <p>This is a simplified form of {@link #query(String)} where the query must reference + * a single input table. + */ + public static PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> + simpleQuery(String sqlQuery) throws Exception { + return new SimpleQueryTransform(sqlQuery); + } + + /** + * A {@link PTransform} representing an execution plan for a SQL query. + */ + public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSQLRow>> { + private String sqlQuery; + public QueryTransform(String sqlQuery) { + this.sqlQuery = sqlQuery; + } + + @Override + public PCollection<BeamSQLRow> expand(PCollectionTuple input) { + BeamRelNode beamRelNode = null; + try { + beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery); + } catch (ValidationException | RelConversionException | SqlParseException e) { + throw new IllegalStateException(e); + } + + try { + return beamRelNode.buildBeamPipeline(input); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + } + + /** + * A {@link PTransform} representing an execution plan for a SQL query referencing + * a single table. + */ + public static class SimpleQueryTransform + extends PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> { + private String sqlQuery; + public SimpleQueryTransform(String sqlQuery) { + this.sqlQuery = sqlQuery; + } + + public SimpleQueryTransform withUdf(String udfName){ + throw new BeamSqlUnsupportedException("Pending for UDF support"); + } + + @Override + public PCollection<BeamSQLRow> expand(PCollection<BeamSQLRow> input) { + SqlNode sqlNode; + try { + sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery); + BeamSqlEnv.planner.getPlanner().close(); + } catch (SqlParseException e) { + throw new IllegalStateException(e); + } + BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder(); + + if (sqlNode instanceof SqlSelect) { + SqlSelect select = (SqlSelect) sqlNode; + String tableName = select.getFrom().toString(); + BeamSqlEnv.registerTable(tableName, + new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType())); + return PCollectionTuple.of(new TupleTag<BeamSQLRow>(tableName), input) + .apply(BeamSql.query(sqlQuery)); + } else { + throw new BeamSqlUnsupportedException(sqlNode.toString()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java new file mode 100644 index 0000000..6591589 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; + +/** + * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. + */ +@Experimental +public class BeamSqlCli { + + /** + * Returns a human readable representation of the query execution plan. + */ + public static String explainQuery(String sqlString) + throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode exeTree = BeamSqlEnv.planner.convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + return beamPlan; + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement) throws Exception{ + PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); // FlinkPipelineOptions.class + options.setJobName("BeamPlanCreator"); + Pipeline pipeline = Pipeline.create(options); + + return compilePipeline(sqlStatement, pipeline); + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement, Pipeline basePipeline) + throws Exception{ + PCollection<BeamSQLRow> resultStream = + BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline); + return resultStream; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java new file mode 100644 index 0000000..af6c007 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.tools.Frameworks; + +/** + * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}. + * + * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and + * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. + */ +public class BeamSqlEnv { + public static SchemaPlus schema; + public static BeamQueryPlanner planner; + + static { + schema = Frameworks.createRootSchema(true); + planner = new BeamQueryPlanner(schema); + } + + /** + * Register a UDF function which can be used in SQL expression. + */ + public static void registerUdf(String functionName, Class<?> clazz, String methodName) { + schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName)); + } + + /** + * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. + * + */ + public static void registerTable(String tableName, BaseBeamTable table) { + schema.add(tableName, table); + planner.getSourceTables().put(tableName, table); + } + + /** + * Find {@link BaseBeamTable} by table name. + */ + public static BaseBeamTable findTable(String tableName){ + return planner.getSourceTables().get(tableName); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 2695944..6a1b81d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -17,93 +17,61 @@ */ package org.apache.beam.dsls.sql.example; -import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.BeamSql; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is one quick example. + * This is a quick example, which uses Beam SQL DSL to create a data pipeline. * - * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka - * cluster locally, and run below commands to create required Kafka topics: - * <pre> - * <code> - * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ - * --partitions 1 --topic orders - * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \ - * --partitions 1 --topic sub_orders - * </code> - * </pre> - * After run the application, produce several test records: - * <pre> - * <code> - * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders - * invalid,record - * 123445,0,100,3413423 - * 234123,3,232,3451231234 - * 234234,0,5,1234123 - * 345234,0,345234.345,3423 - * </code> - * </pre> - * Meanwhile, open another console to see the output: - * <pre> - * <code> - * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders - * **Expected : - * 123445,0,100.0 - * 345234,0,345234.345 - * </code> - * </pre> */ -public class BeamSqlExample implements Serializable { +public class BeamSqlExample { + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class); public static void main(String[] args) throws Exception { - PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() - .as(PipelineOptions.class); // FlinkPipelineOptions.class - options.setJobName("BeamSqlExample"); - Pipeline pipeline = Pipeline.create(options); + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline p = Pipeline.create(options); - BeamSQLEnvironment runner = BeamSQLEnvironment.create(); - runner.addTableMetadata("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders")); - runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + //define the input row format + BeamSQLRecordType type = new BeamSQLRecordType(); + type.addField("c1", SqlTypeName.INTEGER); + type.addField("c2", SqlTypeName.VARCHAR); + type.addField("c3", SqlTypeName.DOUBLE); + BeamSQLRow row = new BeamSQLRow(type); + row.addField(0, 1); + row.addField(1, "row"); + row.addField(2, 1.0); - // case 2: insert into <table>(<fields>) select STREAM <fields> from - // <table> from <clause> - String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " - + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; + //create a source PCollection with Create.of(); + PCollection<BeamSQLRow> inputTable = PBegin.in(p).apply(Create.of(row) + .withCoder(new BeamSqlRowCoder(type))); - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + //run a simple SQL query over input PCollection; + String sql = "select c2, c3 from TABLE_A where c1=1"; + PCollection<BeamSQLRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql)); - pipeline.run().waitUntilFinish(); - } - - public static BaseBeamTable getTable(String bootstrapServer, String topic) { - final RelProtoDataType protoRowType = new RelProtoDataType() { + //log out the output record; + outputStream.apply("log_result", + MapElements.<BeamSQLRow, Void>via(new SimpleFunction<BeamSQLRow, Void>() { @Override - public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + public Void apply(BeamSQLRow input) { + LOG.info(input.valueInString()); + return null; } - }; - - Map<String, Object> consumerPara = new HashMap<String, Object>(); - consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + })); - return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic)) - .updateConsumerProperties(consumerPara); + p.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java index 1f3ba58..abdc66c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -18,16 +18,9 @@ package org.apache.beam.dsls.sql.planner; import java.util.Map; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; /** * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam @@ -37,19 +30,13 @@ import org.apache.beam.sdk.options.PipelineOptions; public class BeamPipelineCreator { private Map<String, BaseBeamTable> sourceTables; - private PipelineOptions options; - private Pipeline pipeline; private boolean hasPersistent = false; - public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline pipeline) { + public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline basePipeline) { this.sourceTables = sourceTables; - this.pipeline = pipeline; - - CoderRegistry cr = pipeline.getCoderRegistry(); - cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); - cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); + this.pipeline = basePipeline; } public Map<String, BaseBeamTable> getSourceTables() { http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 0a7407c..6f148d6 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -22,13 +22,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.CalciteSchema; @@ -95,17 +95,24 @@ public class BeamQueryPlanner { } /** + * Parse input SQL query, and return a {@link SqlNode} as grammar tree. + */ + public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ + return planner.parse(sqlQuery); + } + + /** * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow, * which is linked with the given {@code pipeline}. The final output stream is returned as * {@code PCollection} so more operations can be applied. */ - public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline pipeline) + public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); - BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, pipeline); - - return relNode.buildBeamPipeline(planCreator); + BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, basePipeline); + // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline)); } /** @@ -155,4 +162,8 @@ public class BeamQueryPlanner { return sourceTables; } + public Planner getPlanner() { + return planner; + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 3e147aa..6914883 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -18,11 +18,13 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -34,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -41,6 +44,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Util; import org.joda.time.Duration; @@ -67,16 +71,17 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override - public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) + public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); String stageName = BeamSQLRelUtils.getStageName(this); PCollection<BeamSQLRow> upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); if (windowFieldIdx != -1) { upstream = upstream.apply("assignEventTimestamp", WithTimestamps - .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))); + .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .setCoder(upstream.getCoder()); } PCollection<BeamSQLRow> windowStream = upstream.apply("window", @@ -85,29 +90,59 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { .withAllowedLateness(allowedLatence) .accumulatingFiredPanes()); - //1. extract fields in group-by key part + BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = windowStream.apply("exGroupBy", WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(windowFieldIdx, groupSet))); + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( + windowFieldIdx, groupSet))) + .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, upstream.getCoder())); - //2. apply a GroupByKey. PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create()); + .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create()) + .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder, + IterableCoder.<BeamSQLRow>of(upstream.getCoder()))); - //3. run aggregation functions + BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation", Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), - BeamSQLRecordType.from(input.getRowType())))); + BeamSQLRecordType.from(input.getRowType())))) + .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder)); - //4. flat KV to a single record PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSQLRecordType.from(getRowType()), getAggCallList()))); + mergedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); return mergedStream; } + /** + * Type of sub-rowrecord used as Group-By keys. + */ + private BeamSQLRecordType exKeyFieldsSchema(RelDataType relDataType) { + BeamSQLRecordType inputRecordType = BeamSQLRecordType.from(relDataType); + BeamSQLRecordType typeOfKey = new BeamSQLRecordType(); + for (int i : groupSet.asList()) { + if (i != windowFieldIdx) { + typeOfKey.addField(inputRecordType.getFieldsName().get(i), + inputRecordType.getFieldsType().get(i)); + } + } + return typeOfKey; + } + + /** + * Type of sub-rowrecord, that represents the list of aggregation fields. + */ + private BeamSQLRecordType exAggFieldsSchema() { + BeamSQLRecordType typeOfAggFields = new BeamSQLRecordType(); + for (AggregateCall ac : getAggCallList()) { + typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName()); + } + return typeOfAggFields; + } + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator , ImmutableBitSet groupSet, http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index f2c1bba..3387071 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -19,12 +19,14 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -48,20 +50,20 @@ public class BeamFilterRel extends Filter implements BeamRelNode { } @Override - public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) + public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input) - .buildBeamPipeline(planCreator); + PCollection<BeamSQLRow> upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection<BeamSQLRow> filterStream = upstream.apply(stageName, ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); + filterStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); return filterStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index bc94ab8..f821700 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -19,12 +19,13 @@ package org.apache.beam.dsls.sql.rel; import com.google.common.base.Joiner; import java.util.List; - -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; @@ -52,22 +53,24 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { } /** - * Note that {@code BeamIOSinkRel} returns the input PCollection. + * Note that {@code BeamIOSinkRel} returns the input PCollection, + * which is the persisted PCollection. */ @Override - public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) + public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { + RelNode input = getInput(); String stageName = BeamSQLRelUtils.getStageName(this); PCollection<BeamSQLRow> upstream = - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName); + BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName); - upstream.apply(stageName, targetTable.buildIOWriter()); + PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter()); return upstream; } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index f4d5001..38de41e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -18,12 +18,13 @@ package org.apache.beam.dsls.sql.rel; import com.google.common.base.Joiner; - -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; @@ -40,18 +41,24 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override - public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) + public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { - String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); - - BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName); + String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); String stageName = BeamSQLRelUtils.getStageName(this); - PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline()); - - return sourceStream; + TupleTag<BeamSQLRow> sourceTupleTag = new TupleTag<BeamSQLRow>(sourceName); + if (inputPCollections.has(sourceTupleTag)) { + //choose PCollection from input PCollectionTuple if exists there. + PCollection<BeamSQLRow> sourceStream = inputPCollections + .get(new TupleTag<BeamSQLRow>(sourceName)); + return sourceStream; + } else { + //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). + BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName); + return sourceTable.buildIOReader(inputPCollections.getPipeline()); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 954868d..e2645f1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -21,13 +21,14 @@ import java.util.List; import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -60,21 +61,21 @@ public class BeamProjectRel extends Project implements BeamRelNode { } @Override - public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) + public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception { RelNode input = getInput(); String stageName = BeamSQLRelUtils.getStageName(this); - PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input) - .buildBeamPipeline(planCreator); + PCollection<BeamSQLRow> upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); + projectStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); return projectStream; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index ff2b5b6..ed58090 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -20,18 +20,19 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; /** - * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's + * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's * called by {@link BeamPipelineCreator}. - * */ public interface BeamRelNode extends RelNode { /** - * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream, - * and generate an output {@code PCollection}. + * A {@link BeamRelNode} is a recursive structure, the + * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) + * algorithm. */ - PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; + PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 3df2f34..06a4edf 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -26,16 +26,19 @@ import java.util.Comparator; import java.util.List; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; @@ -120,11 +123,11 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } - @Override public PCollection<BeamSQLRow> buildBeamPipeline( - BeamPipelineCreator planCreator) throws Exception { + @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { RelNode input = getInput(); PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input) - .buildBeamPipeline(planCreator); + .buildBeamPipeline(inputPCollections); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); if (!windowType.equals(GlobalWindow.class)) { @@ -137,16 +140,20 @@ public class BeamSortRel extends Sort implements BeamRelNode { // first find the top (offset + count) PCollection<List<BeamSQLRow>> rawStream = upstream.apply("extractTopOffsetAndFetch", - Top.of(startIndex + count, comparator).withoutDefaults()); + Top.of(startIndex + count, comparator).withoutDefaults()) + .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder())); // strip the `leading offset` if (startIndex > 0) { rawStream = rawStream.apply("stripLeadingOffset", ParDo.of( - new SubListFn<BeamSQLRow>(startIndex, startIndex + count))); + new SubListFn<BeamSQLRow>(startIndex, startIndex + count))) + .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder())); } PCollection<BeamSQLRow> orderedStream = rawStream.apply( "flatten", Flatten.<BeamSQLRow>iterables()); + orderedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType()))); + return orderedStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index 4fbe7ec..ea59906 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -23,13 +23,14 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.core.Values; @@ -56,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode { } - @Override public PCollection<BeamSQLRow> buildBeamPipeline( - BeamPipelineCreator planCreator) throws Exception { + @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) + throws Exception { List<BeamSQLRow> rows = new ArrayList<>(tuples.size()); String stageName = BeamSQLRelUtils.getStageName(this); if (tuples.isEmpty()) { @@ -73,6 +74,7 @@ public class BeamValuesRel extends Values implements BeamRelNode { rows.add(row); } - return planCreator.getPipeline().apply(stageName, Create.of(rows)); + return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) + .setCoder(new BeamSqlRowCoder(beamSQLRecordType)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java index 94531f0..e8fa82f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java @@ -21,7 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -33,6 +36,9 @@ public class BeamSQLRecordType implements Serializable { private List<String> fieldsName = new ArrayList<>(); private List<SqlTypeName> fieldsType = new ArrayList<>(); + /** + * Generate from {@link RelDataType} which is used to create table. + */ public static BeamSQLRecordType from(RelDataType tableInfo) { BeamSQLRecordType record = new BeamSQLRecordType(); for (RelDataTypeField f : tableInfo.getFieldList()) { @@ -47,6 +53,22 @@ public class BeamSQLRecordType implements Serializable { fieldsType.add(fieldType); } + /** + * Create an instance of {@link RelDataType} so it can be used to create a table. + */ + public RelProtoDataType toRelDataType() { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a) { + FieldInfoBuilder builder = a.builder(); + for (int idx = 0; idx < fieldsName.size(); ++idx) { + builder.add(fieldsName.get(idx), fieldsType.get(idx)); + } + return builder.build(); + } + }; + } + public int size() { return fieldsName.size(); } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java deleted file mode 100644 index b88a195..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * A {@link Coder} for {@link BeamSQLRecordType}. - * - */ -public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> { - private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - private static final VarIntCoder intCoder = VarIntCoder.of(); - - private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder(); - private BeamSQLRecordTypeCoder(){} - - public static BeamSQLRecordTypeCoder of() { - return INSTANCE; - } - - @Override - public void encode(BeamSQLRecordType value, OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - Context nested = context.nested(); - intCoder.encode(value.size(), outStream, nested); - for (String fieldName : value.getFieldsName()) { - stringCoder.encode(fieldName, outStream, nested); - } - for (SqlTypeName fieldType : value.getFieldsType()) { - stringCoder.encode(fieldType.name(), outStream, nested); - } - //add a dummy field to indicate the end of record - intCoder.encode(value.size(), outStream, context); - } - - @Override - public BeamSQLRecordType decode(InputStream inStream, - org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - BeamSQLRecordType typeRecord = new BeamSQLRecordType(); - int size = intCoder.decode(inStream, context.nested()); - for (int idx = 0; idx < size; ++idx) { - typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested())); - } - for (int idx = 0; idx < size; ++idx) { - typeRecord.getFieldsType().add( - SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested()))); - } - intCoder.decode(inStream, context); - return typeRecord; - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 0bfe467..f161d27 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -23,24 +23,22 @@ import java.io.OutputStream; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; - import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; /** - * A {@link Coder} encodes {@link BeamSQLRow}. - * + * A {@link Coder} encodes {@link BeamSQLRow}. */ -public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ - private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of(); +public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> { + private BeamSQLRecordType tableSchema; private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); @@ -51,17 +49,13 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ private static final InstantCoder instantCoder = InstantCoder.of(); private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); - private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); - private BeamSqlRowCoder(){} - - public static BeamSqlRowCoder of() { - return INSTANCE; + public BeamSqlRowCoder(BeamSQLRecordType tableSchema) { + this.tableSchema = tableSchema; } @Override public void encode(BeamSQLRow value, OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - recordTypeCoder.encode(value.getDataType(), outStream, context.nested()); listCoder.encode(value.getNullFields(), outStream, context.nested()); for (int idx = 0; idx < value.size(); ++idx) { @@ -115,18 +109,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ @Override public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - BeamSQLRecordType type = recordTypeCoder.decode(inStream, context.nested()); List<Integer> nullFields = listCoder.decode(inStream, context.nested()); - BeamSQLRow record = new BeamSQLRow(type); + BeamSQLRow record = new BeamSQLRow(tableSchema); record.setNullFields(nullFields); - for (int idx = 0; idx < type.size(); ++idx) { + for (int idx = 0; idx < tableSchema.size(); ++idx) { if (nullFields.contains(idx)) { continue; } - switch (type.getFieldsType().get(idx)) { + switch (tableSchema.getFieldsType().get(idx)) { case INTEGER: record.addField(idx, intCoder.decode(inStream, context.nested())); break; @@ -162,7 +155,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ break; default: - throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); + throw new UnsupportedDataTypeException(tableSchema.getFieldsType().get(idx)); } } @@ -172,15 +165,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ return record; } - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; + public BeamSQLRecordType getTableSchema() { + return tableSchema; } @Override public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java index 0d9d147..03f7705 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -21,7 +21,7 @@ import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -38,13 +38,11 @@ import org.junit.BeforeClass; * */ public class BasePlanner { - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); - @BeforeClass public static void prepareClass() { - runner.addTableMetadata("ORDER_DETAILS", getTable()); - runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); - runner.addTableMetadata("SUB_ORDER_RAM", getTable()); + BeamSqlEnv.registerTable("ORDER_DETAILS", getTable()); + BeamSqlEnv.registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); + BeamSqlEnv.registerTable("SUB_ORDER_RAM", getTable()); } private static BaseBeamTable getTable() { http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java index 98d14c3..4ea0662 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; import org.junit.Test; @@ -33,7 +35,7 @@ public class BeamGroupByExplainTest extends BasePlanner { public void testSimpleGroupExplain() throws Exception { String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -43,7 +45,7 @@ public class BeamGroupByExplainTest extends BasePlanner { public void testSimpleGroup2Explain() throws Exception { String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -54,7 +56,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -66,7 +68,7 @@ public class BeamGroupByExplainTest extends BasePlanner { + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -77,7 +79,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -88,7 +90,7 @@ public class BeamGroupByExplainTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } /** @@ -96,9 +98,9 @@ public class BeamGroupByExplainTest extends BasePlanner { */ @Test public void testUdf() throws Exception { - runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); + BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java index 5101c98..0436ca1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.TestPipeline; @@ -37,7 +39,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { public void testSimpleGroupExplain() throws Exception { String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -47,7 +49,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { public void testSimpleGroup2Explain() throws Exception { String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -58,7 +60,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -70,7 +72,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -81,7 +83,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -92,7 +94,7 @@ public class BeamGroupByPipelineTest extends BasePlanner { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", SESSION(order_time, INTERVAL '5' MINUTE)"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } /** @@ -100,10 +102,10 @@ public class BeamGroupByPipelineTest extends BasePlanner { */ @Test public void testUdf() throws Exception { - runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); + BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java index 72b5bf7..946a9fd 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -36,7 +37,7 @@ public class BeamInvalidGroupByTest extends BasePlanner { public void testTumble2Explain() throws Exception { String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } @Test(expected = ValidationException.class) @@ -44,7 +45,7 @@ public class BeamInvalidGroupByTest extends BasePlanner { String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java index ffc3e01..a296eec 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java @@ -22,7 +22,8 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -43,15 +44,14 @@ import org.junit.Test; */ public class BeamPlannerAggregationSubmitTest { public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @Rule public final TestPipeline pipeline = TestPipeline.create(); @BeforeClass public static void prepareClass() throws ParseException { - runner.addTableMetadata("ORDER_DETAILS", getOrderTable()); - runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable()); + BeamSqlEnv.registerTable("ORDER_DETAILS", getOrderTable()); + BeamSqlEnv.registerTable("ORDER_SUMMARY", getSummaryTable()); } @Before @@ -120,7 +120,7 @@ public class BeamPlannerAggregationSubmitTest { + "WHERE SITE_ID = 1 " + "GROUP BY site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); @@ -137,7 +137,7 @@ public class BeamPlannerAggregationSubmitTest { + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 " + "GROUP BY site_id"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java index 1355d5d..e617ff2 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.BeamSqlCli; import org.junit.Assert; import org.junit.Test; @@ -28,7 +29,7 @@ public class BeamPlannerExplainTest extends BasePlanner { @Test public void selectAll() throws Exception { String sql = "SELECT * FROM ORDER_DETAILS"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" @@ -40,7 +41,7 @@ public class BeamPlannerExplainTest extends BasePlanner { public void selectWithFilter() throws Exception { String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" @@ -53,7 +54,7 @@ public class BeamPlannerExplainTest extends BasePlanner { String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - String plan = runner.executionPlan(sql); + String plan = BeamSqlCli.explainQuery(sql); String expectedPlan = "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java index 7219d11..8a48618 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -44,7 +45,7 @@ public class BeamPlannerSubmitTest extends BasePlanner { + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index 4935c3b..a44b0d9 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -22,7 +22,8 @@ import java.util.Collection; import java.util.Date; import java.util.Iterator; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; @@ -37,7 +38,6 @@ import org.junit.Test; * Test for {@code BeamSortRel}. */ public class BeamSortRelTest { - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -71,7 +71,7 @@ public class BeamSortRelTest { + "ORDER BY order_id asc, site_id desc limit 4"; System.out.println(sql); - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); assertEquals( @@ -88,7 +88,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsFirst() throws Exception { - runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable + BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -98,7 +98,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable + BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -108,7 +108,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); assertEquals( @@ -126,7 +126,7 @@ public class BeamSortRelTest { @Test public void testOrderBy_nullsLast() throws Exception { - runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable + BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -136,7 +136,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable + BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -146,7 +146,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); assertEquals( @@ -169,7 +169,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); assertEquals( @@ -192,7 +192,7 @@ public class BeamSortRelTest { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); assertEquals( @@ -223,13 +223,13 @@ public class BeamSortRelTest { + "ORDER BY order_id asc limit 11"; TestPipeline pipeline = TestPipeline.create(); - runner.compileBeamPipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline); } @Before public void prepare() { - runner.addTableMetadata("ORDER_DETAILS", orderDetailTable); - runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable); + BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); + BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); MockedBeamSQLTable.CONTENT.clear(); }
