This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 2a52cb3af842addcb08e554d21671f9feb1d58a4
Author: mingmxu <ming...@ebay.com>
AuthorDate: Thu Aug 17 17:16:05 2017 -0700

    update to latest API/usages
---
 src/_posts/2017-07-21-sql-dsl.md |  33 +++++++------
 src/documentation/dsls/sql.md    | 102 ++++++++++++++++++++-------------------
 2 files changed, 70 insertions(+), 65 deletions(-)

diff --git a/src/_posts/2017-07-21-sql-dsl.md b/src/_posts/2017-07-21-sql-dsl.md
index 4aa97d1..95df3f8 100644
--- a/src/_posts/2017-07-21-sql-dsl.md
+++ b/src/_posts/2017-07-21-sql-dsl.md
@@ -13,42 +13,45 @@ Beam SQL DSL provides the capability to execute standard 
SQL queries using Beam
 <!--more-->
 
 # <a name="overview"></a>1. Overview
-SQL is a well-adopted standard to process data with concise syntax. With DSL 
APIs, now `PCollection`s can be queried with standard SQL statements, like a 
regular table. The DSL APIs leverage [Apache 
Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then 
translate into a composite Beam `PTransform`. In this way, both SQL and normal 
Beam `PTransform`s can be mixed in the same pipeline.
+SQL is a well-adopted standard to process data with concise syntax. With DSL 
APIs, now `PCollection`s can be queried with standard SQL statements, like a 
regular table. The DSL APIs leverage [Apache 
Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then 
translate it into a _composite_ Beam `PTransform`. In this way, both SQL and 
normal Beam `PTransform`s can be mixed in one pipeline.
 
 # <a name="usage"></a>2. Usage of DSL APIs 
-The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only 
endpoint exposed to developers. It wraps the back-end details of 
parsing/validation/assembling, to deliver a Beam SDK style API that can take 
either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY 
etc. 
+`BeamSql` is the only interface(with two methods `BeamSql.query()` and 
`BeamSql.simpleQuery()`) for developers. It wraps the back-end details of 
parsing/validation/assembling, and deliver a Beam SDK style API that can take 
either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY 
etc. 
 
-*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` 
applies on a `PCollectionTuple` with one or many input `PCollection`s, and 
`BeamSql.simpleQuery()` is a simplified API which applies on single 
`PCollection`.
+*Note*, the two methods are equivalent in functionality, `BeamSql.query()` 
applies on a `PCollectionTuple` with one or many input `PCollection`s; 
`BeamSql.simpleQuery()` is a simplified API which applies on single 
`PCollection`.
 
 
[BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java)
 in code repository shows the usage of both APIs:
 
 ```
 //Step 1. create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+...
+
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row...)
+    .withCoder(type.getRecordCoder()));
 
 //Step 2. (Case 1) run a simple SQL query over input PCollection with 
BeamSql.simpleQuery;
-PCollection<BeamSqlRow> outputStream = inputTable.apply(
-    BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
+PCollection<BeamRecord> outputStream = inputTable.apply(
+    BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
 
 
-//Step 2. (Case 2) run the query with BeamSql.query
-PCollection<BeamSqlRow> outputStream2 =
-    PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
-        .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+//Step 2. (Case 2) run the query with BeamSql.query over result PCollection of 
(case 1);
+PCollection<BeamRecord> outputStream2 =
+    PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by 
c2"));
 ```
 
-In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The 
work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of 
Beam SQL DSL. 
+In Step 1, a `PCollection<BeamRecord>` is prepared as the source dataset. The 
work to generate a queriable `PCollection<BeamRecord>` is beyond the scope of 
Beam SQL DSL. 
 
-Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be 
aware that the input `PCollection` is named with a fixed table name 
__PCOLLECTION__. Step 2(Case 2) is an example to run a query with 
`BeamSql.query()`. A Table name used in the query is specified when adding 
`PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` 
or `BeamSql.simpleQuery()` has its own schema repository, developers need to 
include all `PCollection`s that would be used in yo [...]
+Step 2(Case 1) shows the usage to run a query with `BeamSql.simpleQuery()`, be 
aware that the input `PCollection` is named with a fixed table name 
__PCOLLECTION__. Step 2(Case 2) is another example to run a query with 
`BeamSql.query()`. A Table name is specified when adding `PCollection` to 
`PCollectionTuple`. As each call of either `BeamSql.query()` or 
`BeamSql.simpleQuery()` has its own schema repository, developers need to 
include all `PCollection`s that would be used in your query.
 
 # <a name="functionality"></a>4. Functionality in Beam SQL
 Just as the unified model for both bounded and unbounded data in Beam, SQL DSL 
provides the same functionalities for bounded and unbounded `PCollection` as 
well. 
 
-Note that, SQL support is not fully completed. Queries that include 
unsupported features would cause a UnsupportedOperationException.
+Note that, SQL support is not fully completed. Queries that include 
unsupported features would cause an `UnsupportedOperationException`.
 
 ## <a name="features"></a>4.1. Supported Features
-The following features are supported in current repository (this chapter will 
be updated continually).
+The following features are supported in current repository:
 
 1. filter clauses;
 2. data field projections;
diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index 34f26ae..bfcbd1c 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -54,39 +54,42 @@ pCollection.apply(BeamSqlFilter...)
 ```
 
 # <a name="usage"></a>3. Usage of DSL APIs 
-The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only 
endpoint exposed to developers. It wraps the back-end details of 
parsing/validation/assembling, to deliver a Beam SDK style API that can take 
either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY 
etc. 
+`BeamSql` is the only interface(with two methods `BeamSql.query()` and 
`BeamSql.simpleQuery()`) for developers. It wraps the back-end details of 
parsing/validation/assembling, and deliver a Beam SDK style API that can take 
either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY 
etc. 
 
-*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` 
applies on a `PCollectionTuple` with one or many input `PCollection`s, and 
`BeamSql.simpleQuery()` is a simplified API which applies on single 
`PCollection`.
+*Note*, the two methods are equivalent in functionality, `BeamSql.query()` 
applies on a `PCollectionTuple` with one or many input `PCollection`s; 
`BeamSql.simpleQuery()` is a simplified API which applies on single 
`PCollection`.
 
 
[BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java)
 in code repository shows the usage of both APIs:
 
 ```
 //Step 1. create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+...
+
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row...)
+    .withCoder(type.getRecordCoder()));
 
 //Step 2. (Case 1) run a simple SQL query over input PCollection with 
BeamSql.simpleQuery;
-PCollection<BeamSqlRow> outputStream = inputTable.apply(
-    BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
+PCollection<BeamRecord> outputStream = inputTable.apply(
+    BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
 
 
-//Step 2. (Case 2) run the query with BeamSql.query
-PCollection<BeamSqlRow> outputStream2 =
-    PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
-        .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+//Step 2. (Case 2) run the query with BeamSql.query over result PCollection of 
(case 1);
+PCollection<BeamRecord> outputStream2 =
+    PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by 
c2"));
 ```
 
-In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The 
work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of 
Beam SQL DSL. 
+In Step 1, a `PCollection<BeamRecord>` is prepared as the source dataset. The 
work to generate a queriable `PCollection<BeamRecord>` is beyond the scope of 
Beam SQL DSL. 
 
-Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be 
aware that the input `PCollection` is named with a fixed table name 
__PCOLLECTION__. Step 2(Case 2) is an example to run a query with 
`BeamSql.query()`. A Table name used in the query is specified when adding 
`PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` 
or `BeamSql.simpleQuery()` has its own schema repository, developers need to 
include all `PCollection`s that would be used in yo [...]
+Step 2(Case 1) shows the usage to run a query with `BeamSql.simpleQuery()`, be 
aware that the input `PCollection` is named with a fixed table name 
__PCOLLECTION__. Step 2(Case 2) is another example to run a query with 
`BeamSql.query()`. A Table name is specified when adding `PCollection` to 
`PCollectionTuple`. As each call of either `BeamSql.query()` or 
`BeamSql.simpleQuery()` has its own schema repository, developers need to 
include all `PCollection`s that would be used in your query.
 
 # <a name="functionality"></a>4. Functionality in Beam SQL
 Just as the unified model for both bounded and unbounded data in Beam, SQL DSL 
provides the same functionalities for bounded and unbounded `PCollection` as 
well. 
 
-Note that, SQL support is not fully completed. Queries that include 
unsupported features would cause a UnsupportedOperationException.
+Note that, SQL support is not fully completed. Queries that include 
unsupported features would cause an `UnsupportedOperationException`.
 
 ## <a name="features"></a>4.1. Supported Features
-The following features are supported in current repository (this chapter will 
be updated continually).
+The following features are supported in current repository:
 
 **1. filter clauses;**
 
@@ -94,7 +97,7 @@ The following features are supported in current repository 
(this chapter will be
 
 **3. aggregations;**
 
-Beam SQL supports aggregation functions COUNT/SUM/MAX/MIN/AVG with group_by in 
global_window, fixed_window, sliding_window and session_window. A field with 
type `TIMESTAMP` is required to specify fixed_window/sliding_window/session 
window, which is used as event timestamp for rows. See below for several 
examples:
+Beam SQL supports aggregation functions with group_by in global_window, 
fixed_window, sliding_window and session_window. A field with type `TIMESTAMP` 
is required to specify fixed_window/sliding_window/session_window. The field is 
used as event timestamp for rows. See below for several examples:
 
 ```
 //fixed window, one hour in duration
@@ -126,12 +129,12 @@ The scenarios of join can be categorized into 3 cases:
 2. UnboundedTable JOIN UnboundedTable
 3. BoundedTable JOIN UnboundedTable
 
-For case 1 and case 2, a standard join is utilized as long as the windowFn of 
the both sides match. For case 3, sideInput is utilized to implement the join. 
So there are some constraints:
+For case 1 and case 2, a standard join is utilized as long as the windowFn of 
the both sides match. For case 3, sideInput is utilized to implement the join. 
So far there are some constraints:
 
 * Only equal-join is supported, CROSS JOIN is not supported;
 * FULL OUTER JOIN is not supported;
 * If it's a LEFT OUTER JOIN, the unbounded table should on the left side; If 
it's a RIGHT OUTER JOIN, the unbounded table should on the right side;
-* The trigger is inherented from upstream, which should be consistent;
+* window/trigger is inherented from upstreams, which should be consistent;
 
 **5. built-in SQL functions**
 
@@ -141,7 +144,7 @@ If the required function is not available, developers can 
register their own UDF
 
 **create and specify User Defined Function (UDF)**
 
-A UDF can be any Java method that takes zero or more scalar fields, and return 
one scalar value. Below is an example of UDF and how to use it in DSL:
+A UDF can be 1) any Java method that takes zero or more scalar fields and 
return one scalar value, or 2) a `SerializableFunction`. Below is an example of 
UDF and how to use it in DSL:
 
 ```
 /**
@@ -153,44 +156,45 @@ public static class CubicInteger implements BeamSqlUdf{
   }
 }
 
+/**
+ * Another example UDF with {@link SerializableFunction}.
+ */
+public static class CubicIntegerFn implements SerializableFunction<Integer, 
Integer> {
+  @Override
+  public Integer apply(Integer input) {
+    return input * input * input;
+  }
+}
+
 // register and call in SQL
-String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE 
f_int = 2";
+String sql = "SELECT f_int, cubic1(f_int) as cubicvalue1, cubic2(f_int) as 
cubicvalue2 FROM PCOLLECTION WHERE f_int = 2";
 PCollection<BeamSqlRow> result =
     input.apply("udfExample",
-        BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class));
+        BeamSql.simpleQuery(sql).withUdf("cubic1", CubicInteger.class)
+                                       .withUdf("cubic2", new 
CubicIntegerFn()));
 ```
 
 **create and specify User Defined Aggregate Function (UDAF)**
 
-A UDAF aggregates a set of grouped scalar values, and output a single scalar 
value. To create a UDAF function, it's required to extend 
`org.apache.beam.dsls.sql.schema.BeamSqlUdaf<InputT, AccumT, OutputT>`, which 
defines 4 methods to process an aggregation:
-
-1. init(), to create an initial accumulate value;
-2. add(), to apply a new value to existing accumulate value;
-3. merge(), to merge accumulate values from parallel operators;
-4. result(), to generate the final result from accumulate value;
-
-Here's an example of UDAF:
+Beam SQL can accept a `CombineFn` as UDAF. Here's an example of UDAF:
 
 ```
 /**
- * UDAF for test, which returns the sum of square.
+ * UDAF(CombineFn) for test, which returns the sum of square.
  */
-public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
-  public SquareSum() {
-  }
-  
+public static class SquareSum extends CombineFn<Integer, Integer, Integer> {
   @Override
-  public Integer init() {
+  public Integer createAccumulator() {
     return 0;
   }
-  
+
   @Override
-  public Integer add(Integer accumulator, Integer input) {
+  public Integer addInput(Integer accumulator, Integer input) {
     return accumulator + input * input;
   }
-  
+
   @Override
-  public Integer merge(Iterable<Integer> accumulators) {
+  public Integer mergeAccumulators(Iterable<Integer> accumulators) {
     int v = 0;
     Iterator<Integer> ite = accumulators.iterator();
     while (ite.hasNext()) {
@@ -200,20 +204,21 @@ public static class SquareSum extends 
BeamSqlUdaf<Integer, Integer, Integer> {
   }
 
   @Override
-  public Integer result(Integer accumulator) {
+  public Integer extractOutput(Integer accumulator) {
     return accumulator;
   }
+
 }
 
 //register and call in SQL
 String sql = "SELECT f_int1, squaresum(f_int2) AS `squaresum` FROM PCOLLECTION 
GROUP BY f_int2";
 PCollection<BeamSqlRow> result =
     input.apply("udafExample",
-        BeamSql.simpleQuery(sql).withUdaf("squaresum", SquareSum.class));
+        BeamSql.simpleQuery(sql).withUdaf("squaresum", new SquareSum()));
 ```
  
-## <a name="beamsqlrow"></a>4.2. Intro of BeamSqlRow
-`BeamSqlRow`, encoded/decoded by `BeamSqlRowCoder`, represents a single, 
implicitly structured data item in a Beam SQL compatible `PCollection`. Similar 
as _row_ in the context of relational database, each `BeamSqlRow` consists of 
named columns with fixed types(see [4.3. Data Types](#data-type)).
+## <a name="beamsqlrow"></a>4.2. Intro of BeamRecord
+`BeamRecord`, described by `BeamRecordType`(extended `BeamRecordSqlType` in 
Beam SQL) and encoded/decoded by `BeamRecordCoder`, represents a single, 
immutable row in a Beam SQL `PCollection`. Similar as _row_ in relational 
database, each `BeamRecord` consists of named columns with fixed types(see 
[4.3. Data Types](#data-type)).
 
 A Beam SQL `PCollection` can be created from an external source, in-memory 
data or derive from another SQL query. For `PCollection`s from external source 
or in-memory data, it's required to specify coder explcitly; `PCollection` 
derived from SQL query has the coder set already. Below is one example:
 
@@ -221,19 +226,16 @@ A Beam SQL `PCollection` can be created from an external 
source, in-memory data
 //define the input row format
 List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
 List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, 
Types.DOUBLE);
-BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
-BeamSqlRow row = new BeamSqlRow(type);
-row.addField(0, 1);
-row.addField(1, "row");
-row.addField(2, 1.0);
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
 
 //create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+    .withCoder(type.getRecordCoder()));
 ```
  
 ## <a name="data-type"></a>4.3. Data Types
-Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. 
The following table lists the relation between SQL types and Java classes, 
which are supported in current repository:
+Each type in Beam SQL maps to a Java class to holds the value in `BeamRecord`. 
The following table lists the relation between SQL types and Java classes, 
which are supported in current repository:
 
 | SQL Type | Java class |
 | ---- | ---- |

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to