sjwiesman commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298206821
########## File path: docs/getting-started/tutorials/table_api.md ########## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=spend-report \ + -DartifactId=spend-report \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + +#### The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + +#### Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +{% endhighlight %} + +We register two tables, a `transactions` input table and a `stdout` output table. +The `transactions` table lets us read credit card transactions, which contain `accountId`'s, `timestamp`'s, and dollar `amount`'s. +The `stdout` table writes the output of a job to standard out so we can easily see our results. +Both tables support running batch and streaming applications. + +#### Registering A UDF + +Along with tables we include a user defined function for working with timestamps. Our function takes a timestamp and rounds it down the nearest hour. + +{% highlight java %} +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); +{% endhighlight %} + +#### The Query + +With our environment configured and tables registered we are ready to build our first application. +From the `TableEnvironment` we can `scan` an input table to read its rows and then write those results into an output table using `insertInto`. + +{% highlight java %} +tEnv + .scan("transactions") + .insertInto("stdout"); +{% endhighlight %} + +This intial job simply reads all transactions and writes them to standard out. + +#### Execute + +Flink applications are built lazily and shipped to the cluster for execution only once fully formed. +We call `ExecutionEnvironment#execute` to begin the execution of our job. + +{% highlight java %} +env.execute("Spend Report"); +{% endhighlight %} + +## Attempt One + +Now that we have the skeleton of a job, let's add some business logic. +We want a report that shows the total spend for each account across each hour of the day. +Just like a SQL query, we can select the required fields and group by our keys. +Because the timestamp field has millisecond granularity we will use our UDF to round it down to the nearest hour. +Finally, we will select all the fields, summing the total spend per account-hour pair. + +{% highlight java %} +tEnv + .scan("transactions") + .select("accountId, timestamp.truncateDateToHour as timestamp, amount") + .groupBy("accountId, timestamp") + .select("accountId, timestamp, amount.sum as total") + .insertInto("stdout"); +{% endhighlight %} + +## Adding Windows + +While this works, we can do better. +The `timestamp` column represents the [event time]({{ site.baseurl }}/dev/event_time.html) of each row, where event time is the logical time when an event took place in the real world. +Flink understands the concept of event time, which we can leverage to clean up our code. + +Bucketing data based on time is very common operation in data processing, especially when working with infinite streams. Review comment: They both make sense to me but I'm happy to change it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services