This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch mergebot in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit c8ae4c444af7ee37d1ce00383e1ef322ffbc8bb9 Author: mingmxu <ming...@ebay.com> AuthorDate: Tue Jul 18 16:45:06 2017 -0700 add intro of BeamSqlRow, trigger settings of aggregation/join, other misc spelling correction --- src/documentation/dsls/sql.md | 67 +++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md index c9cb0f7..8834a93 100644 --- a/src/documentation/dsls/sql.md +++ b/src/documentation/dsls/sql.md @@ -9,13 +9,14 @@ permalink: /documentation/dsls/sql/ * [3. Usage of DSL APIs](#usage) * [4. Functionality in Beam SQL](#functionality) * [4.1. Supported Features](#features) - * [4.2. Data Types](#data-type) - * [4.3. built-in SQL functions](#built-in-functions) + * [4.2. Intro of BeamSqlRow](#beamsqlrow) + * [4.3. Data Types](#data-type) + * [4.4. built-in SQL functions](#built-in-functions) This page describes the implementation of Beam SQL, and how to simplify a Beam pipeline with DSL APIs. # <a name="overview"></a>1. Overview -SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverages [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline. +SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline. # <a name="internal-of-sql"></a>2. The Internal of Beam SQL Figure 1 describes the back-end steps from a SQL statement to a Beam `PTransform`. @@ -24,14 +25,14 @@ Figure 1 describes the back-end steps from a SQL statement to a Beam `PTransform **Figure 1** workflow of Beam SQL DSL -Given a PCollection and the query as input, first of all the input PCollection is registered as a table in the schema repository. Then it's processed as: +Given a `PCollection` and the query as input, first of all the input `PCollection` is registered as a table in the schema repository. Then it's processed as: 1. SQL query is parsed according to grammar to generate a SQL Abstract Syntax Tree; 2. Validate against table schema, and output a logical plan represented with relational algebras; 3. Relational rules are applied to convert it to a physical plan, expressed with Beam components. An optimizer is optional to update the plan; 4. Finally, the Beam physical plan is compiled as a composite `PTransform`; -Here is an example to show a query that filters and projects from a input PCollection: +Here is an example to show a query that filters and projects from a input `PCollection`: ``` SELECT USER_ID, USER_NAME FROM PCOLLECTION WHERE USER_ID = 1 @@ -55,7 +56,7 @@ pCollection.apply(BeamSqlFilter...) # <a name="usage"></a>3. Usage of DSL APIs The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only endpoint exposed to developers. It wraps the back-end details of parsing/validation/assembling, to deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. -*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single PCollections. +*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`. [BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java) in code repository shows the usage of both APIs: @@ -77,10 +78,10 @@ PCollection<BeamSqlRow> outputStream2 = In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of Beam SQL DSL. -Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input PCollection is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding PCollection to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all PCollections that would be used in your query. +Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in yo [...] # <a name="functionality"></a>4. Functionality in Beam SQL -Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded PCollection as well. +Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded `PCollection` as well. Note that, SQL support is not fully completed. Queries that include unsupported features would cause a UnsupportedOperationException. @@ -106,7 +107,16 @@ SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, HOP(f_timestam SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, SESSION(f_timestamp, INTERVAL '5' MINUTE) ``` -Note: distinct aggregation is not supported yet. +Note: + +1. distinct aggregation is not supported yet. +2. the default trigger is `Repeatedly.forever(AfterWatermark.pastEndOfWindow())`; +3. when `time` field in `HOP(dateTime, slide, size [, time ])`/`TUMBLE(dateTime, interval [, time ])`/`SESSION(dateTime, interval [, time ])` is specified, a lateFiring trigger is added as + +``` +Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime + .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); +``` **4. Join (inner, left_outer, right_outer);** @@ -121,6 +131,7 @@ For case 1 and case 2, a standard join is utilized as long as the windowFn of th * Only equal-join is supported, CROSS JOIN is not supported; * FULL OUTER JOIN is not supported; * If it's a LEFT OUTER JOIN, the unbounded table should on the left side; If it's a RIGHT OUTER JOIN, the unbounded table should on the right side; +* The trigger is inherented from upstream, which should be consistent; **5. built-in SQL functions** @@ -136,8 +147,8 @@ A UDF can be any Java method that takes zero or more scalar fields, and return o /** * A example UDF for test. */ -public static class CubicInteger{ - public static Integer cubic(Integer input){ +public static class CubicInteger implements BeamSqlUdf{ + public static Integer eval(Integer input){ return input * input * input; } } @@ -146,7 +157,7 @@ public static class CubicInteger{ String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection<BeamSqlRow> result = input.apply("udfExample", - BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class, "cubic")); + BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class)); ``` **create and specify User Defined Aggregate Function (UDAF)** @@ -168,17 +179,17 @@ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> { public SquareSum() { } - // @Override + @Override public Integer init() { return 0; } - // @Override + @Override public Integer add(Integer accumulator, Integer input) { return accumulator + input * input; } - // @Override + @Override public Integer merge(Iterable<Integer> accumulators) { int v = 0; Iterator<Integer> ite = accumulators.iterator(); @@ -188,7 +199,7 @@ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> { return v; } - // @Override + @Override public Integer result(Integer accumulator) { return accumulator; } @@ -201,7 +212,27 @@ PCollection<BeamSqlRow> result = BeamSql.simpleQuery(sql).withUdaf("squaresum", SquareSum.class)); ``` -## <a name="data-type"></a>4.2. Data Types +## <a name="beamsqlrow"></a>4.2. Intro of BeamSqlRow +`BeamSqlRow`, encoded/decoded by `BeamSqlRowCoder`, represents a single, implicitly structured data item in a Beam SQL compatible `PCollection`. Similar as _row_ in the context of relational database, each `BeamSqlRow` consists of named columns with fixed types(see [4.3. Data Types](#data-type)). + +A Beam SQL `PCollection` can be created from an external source, in-memory data or derive from another SQL query. For `PCollection`s from external source or in-memory data, it's required to specify coder explcitly; `PCollection` derived from SQL query has the coder set already. Below is one example: + +``` +//define the input row format +List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); +List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); +BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); +BeamSqlRow row = new BeamSqlRow(type); +row.addField(0, 1); +row.addField(1, "row"); +row.addField(2, 1.0); + +//create a source PCollection with Create.of(); +PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row) + .withCoder(new BeamSqlRowCoder(type))); +``` + +## <a name="data-type"></a>4.3. Data Types Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. The following table lists the relation between SQL types and Java classes, which are supported in current repository: | SQL Type | Java class | @@ -217,7 +248,7 @@ Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. T | Types.TIMESTAMP | java.util.Date | {:.table} -## <a name="built-in-functions"></a>4.3. built-in SQL functions +## <a name="built-in-functions"></a>4.4. built-in SQL functions Beam SQL has implemented lots of build-in functions defined in [Apache Calcite](http://calcite.apache.org). The available functions are listed as below: -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.