This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f4943dd [FLINK-12747][docs] Getting Started - Table Api Walkthrough f4943dd is described below commit f4943dd06b2ff7da899812a2aaa2e0b24c2afc01 Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Wed Jun 26 14:48:38 2019 -0500 [FLINK-12747][docs] Getting Started - Table Api Walkthrough --- docs/getting-started/examples/index.md | 2 +- docs/getting-started/examples/index.zh.md | 2 +- docs/getting-started/tutorials/index.md | 2 +- docs/getting-started/tutorials/index.zh.md | 2 +- .../{tutorials => walkthroughs}/index.md | 8 +- .../{tutorials => walkthroughs}/index.zh.md | 8 +- docs/getting-started/walkthroughs/table_api.md | 494 +++++++++++++++++++++ docs/getting-started/walkthroughs/table_api.zh.md | 494 +++++++++++++++++++++ flink-end-to-end-tests/run-nightly-tests.sh | 3 + .../test-scripts/test_table_walkthroughs.sh | 74 +++ .../flink-walkthrough-common/pom.xml | 59 +++ .../walkthrough/common/entity/Transaction.java | 93 ++++ .../common/sink/LoggerOutputFormat.java | 50 +++ .../common/source/TransactionIterator.java | 130 ++++++ .../common/source/TransactionRowInputFormat.java | 60 +++ .../common/source/TransactionSource.java | 65 +++ .../table/BoundedTransactionTableSource.java | 55 +++ .../common/table/SpendReportTableSink.java | 95 ++++ .../common/table/TruncateDateToHour.java | 46 ++ .../table/UnboundedTransactionTableSource.java | 89 ++++ .../flink-walkthrough-table-java/pom.xml | 26 +- .../META-INF/maven/archetype-metadata.xml | 25 +- .../src/main/resources/archetype-resources/pom.xml | 263 +++++++++++ .../src/main/java/SpendReport.java | 45 ++ .../src/main/resources/log4j.properties | 24 + .../flink-walkthrough-table-scala/pom.xml | 26 +- .../META-INF/maven/archetype-metadata.xml | 25 +- .../src/main/resources/archetype-resources/pom.xml | 300 +++++++++++++ .../src/main/resources/log4j.properties | 24 + .../src/main/scala/SpendReport.scala | 41 ++ flink-walkthroughs/pom.xml | 95 ++++ pom.xml | 1 + 32 files changed, 2686 insertions(+), 40 deletions(-) diff --git a/docs/getting-started/examples/index.md b/docs/getting-started/examples/index.md index 6810bbd..d4d315c 100644 --- a/docs/getting-started/examples/index.md +++ b/docs/getting-started/examples/index.md @@ -3,7 +3,7 @@ title: Examples nav-id: examples nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> Examples' nav-parent_id: getting-started -nav-pos: 2 +nav-pos: 3 nav-show_overview: true --- <!-- diff --git a/docs/getting-started/examples/index.zh.md b/docs/getting-started/examples/index.zh.md index 3bca74a..e0925d8 100644 --- a/docs/getting-started/examples/index.zh.md +++ b/docs/getting-started/examples/index.zh.md @@ -3,7 +3,7 @@ title: 示例 nav-id: examples nav-title: '<i class="fa fa-file-code-o title appetizer" aria-hidden="true"></i> 示例' nav-parent_id: getting-started -nav-pos: 2 +nav-pos: 3 nav-show_overview: true --- <!-- diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/tutorials/index.md index 9b0c8f4..f283489 100644 --- a/docs/getting-started/tutorials/index.md +++ b/docs/getting-started/tutorials/index.md @@ -3,7 +3,7 @@ title: "Tutorials" nav-id: tutorials nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' nav-parent_id: getting-started -nav-pos: 1 +nav-pos: 2 --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/tutorials/index.zh.md index 540a6c4..ac6715b 100644 --- a/docs/getting-started/tutorials/index.zh.md +++ b/docs/getting-started/tutorials/index.zh.md @@ -3,7 +3,7 @@ title: "教程" nav-id: tutorials nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程' nav-parent_id: getting-started -nav-pos: 1 +nav-pos: 2 --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/docs/getting-started/tutorials/index.md b/docs/getting-started/walkthroughs/index.md similarity index 84% copy from docs/getting-started/tutorials/index.md copy to docs/getting-started/walkthroughs/index.md index 9b0c8f4..ae2f536 100644 --- a/docs/getting-started/tutorials/index.md +++ b/docs/getting-started/walkthroughs/index.md @@ -1,7 +1,7 @@ --- -title: "Tutorials" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' +title: "Code Walkthroughs" +nav-id: walkthroughs +nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs' nav-parent_id: getting-started nav-pos: 1 --- @@ -22,4 +22,4 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> +--> \ No newline at end of file diff --git a/docs/getting-started/tutorials/index.zh.md b/docs/getting-started/walkthroughs/index.zh.md similarity index 84% copy from docs/getting-started/tutorials/index.zh.md copy to docs/getting-started/walkthroughs/index.zh.md index 540a6c4..ae2f536 100644 --- a/docs/getting-started/tutorials/index.zh.md +++ b/docs/getting-started/walkthroughs/index.zh.md @@ -1,7 +1,7 @@ --- -title: "教程" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> 教程' +title: "Code Walkthroughs" +nav-id: walkthroughs +nav-title: '<i class="fa fa-bolt title appetizer" aria-hidden="true"></i> Code Walkthroughs' nav-parent_id: getting-started nav-pos: 1 --- @@ -22,4 +22,4 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> +--> \ No newline at end of file diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md new file mode 100644 index 0000000..878fb54 --- /dev/null +++ b/docs/getting-started/walkthroughs/table_api.md @@ -0,0 +1,494 @@ +--- +title: "Table API" +nav-id: tableapiwalkthrough +nav-title: 'Table API' +nav-parent_id: walkthroughs +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Will You Be Building? + +In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time. +You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline. + +## Prerequisites + +This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. +It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## How To Follow Along + +If you want to follow along, you will require a computer with: + +* Java 8 +* Maven + +A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=spend-report \ + -DartifactId=spend-report \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=spend-report \ + -DartifactId=spend-report \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +</div> + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); +tEnv.registerTableSink("spend_report", new SpendReportTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("spend_report"); + +env.execute("Spend Report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = BatchTableEnvironment.create(env) + +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) +tEnv.registerTableSink("spend_report", new SpendReportTableSink) + +val truncateDateToHour = new TruncateDateToHour + +tEnv + .scan("transactions") + .insertInto("spend_report") + +env.execute("Spend Report") +{% endhighlight %} +</div> +</div> + +## Breaking Down The Code + +#### The Execution Environment + +The first two lines set up your `ExecutionEnvironment`. +The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources. +This walkthrough begins with the batch environment since you are building a periodic batch report. +It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = BatchTableEnvironment.create(env) +{% endhighlight %} +</div> +</div> + + +#### Registering Tables + +Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data. +A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system. +A table sink emits a table to an external storage system. +Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); +tEnv.registerTableSink("spend_report", new SpendReportTableSink()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) +tEnv.registerTableSink("spend_report", new SpendReportTableSink) +{% endhighlight %} +</div> +</div> + +Two tables are registered; a transaction input table, and a spend report output table. +The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`). +In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems. +In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source. +The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results. + +#### Registering A UDF + +Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps. +This function takes a timestamp and rounds it down to the nearest hour. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val truncateDateToHour = new TruncateDateToHour +{% endhighlight %} +</div> +</div> + +#### The Query + +With the environment configured and tables registered, you are ready to build your first application. +From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +Initially, the Job reads all transactions and logs them out with log level **INFO**. + +#### Execute + +Flink applications are built lazily and shipped to the cluster for execution only once fully formed. +You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +env.execute("Spend Report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +env.execute("Spend Report") +{% endhighlight %} +</div> +</div> + +## Attempt One + +Now with the skeleton of a Job set-up, you are ready to add some business logic. +The goal is to build a report that shows the total spend for each account across each hour of the day. +Just like a SQL query, Flink can select the required fields and group by your keys. +Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour. +Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions). + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .select("accountId, timestamp.truncateDateToHour as timestamp, amount") + .groupBy("accountId, timestamp") + .select("accountId, timestamp, amount.sum as total") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount) + .groupBy('accountId, 'timestamp) + .select('accountId, 'timestamp, 'amount.sum as 'total) + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner. + +{% highlight raw %} +# Query 1 output showing account id, timestamp, and amount + +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 +> 1, 2019-01-01 02:00:00.0, $859.35 +> 2, 2019-01-01 02:00:00.0, $458.40 +> 1, 2019-01-01 03:00:00.0, $330.85 +> 2, 2019-01-01 03:00:00.0, $730.02 +> 1, 2019-01-01 04:00:00.0, $585.16 +> 2, 2019-01-01 04:00:00.0, $760.76 +{% endhighlight %} + +## Adding Windows + +Grouping data based on time is a typical operation in data processing, especially when working with infinite streams. +A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics. +The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .window(Tumble.over("1.hour").on("timestamp").as("w")) + .groupBy("accountId, w") + .select("accountId, w.start as timestamp, amount.sum") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .window(Tumble over 1.hour on 'timestamp as 'w) + .groupBy('accountId, 'w) + .select('accountId, 'w.start as 'timestamp, 'amount.sum) + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +This defines your application as using one hour tumbling windows based on the timestamp column. +So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window. + +Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application. +In a batch context, windows offer a convenient API for grouping records by a timestamp attribute. + +Running the updated query will produce identical results as before. + +{% highlight raw %} +# Query 2 output showing account id, timestamp, and amount + +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 +> 1, 2019-01-01 02:00:00.0, $859.35 +> 2, 2019-01-01 02:00:00.0, $458.40 +> 1, 2019-01-01 03:00:00.0, $330.85 +> 2, 2019-01-01 03:00:00.0, $730.02 +> 1, 2019-01-01 04:00:00.0, $585.16 +> 2, 2019-01-01 04:00:00.0, $760.76 +{% endhighlight %} + +## Once More, With Streaming! + +Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps. + +The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job. +It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure. +This is what will be used by your `Tumble` window when grouping records. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +val tEnv = StreamTableEnvironment.create(env) +{% endhighlight %} +</div> +</div> + +The second step is to migrate from a bounded data source to an infinite data source. +The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time. +Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems. +In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource) +{% endhighlight %} +</div> +</div> + +And that's it, a fully functional, stateful, distributed streaming application! +The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready. +Since the input is unbounded, the query keeps running until it is manually stopped. +And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window. + +{% highlight raw %} +# Query 3 output showing account id, timestamp, and amount + +# These rows are calculated continuously over the hour +# and output immediately at the end of the hour +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 + +# Flink begins computing these rows as soon as +# as the first record for the window arrives +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 + +{% endhighlight %} + +## Final Application + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +package spendreport; + +import org.apache.flink.walkthrough.common.table.SpendReportTableSink; +import org.apache.flink.walkthrough.common.table.TransactionTableSource; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Tumble; +import org.apache.flink.table.api.java.StreamTableEnvironment; + +public class SpendReport { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource()); + tEnv.registerTableSink("spend_report", new SpendReportTableSink()); + + tEnv + .scan("transactions") + .window(Tumble.over("1.hour").on("timestamp").as("w")) + .groupBy("accountId, w") + .select("accountId, w.start as timestamp, amount.sum") + .insertInto("spend_report"); + + env.execute("Spend Report"); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.api.Tumble +import org.apache.flink.table.api.scala._ +import org.apache.flink.walkthrough.common.table._ + +object SpendReport { + + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val tEnv = StreamTableEnvironment.create(env) + + tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource) + tEnv.registerTableSink("spend_report", new SpendReportTableSink) + + tEnv + .scan("transactions") + .window(Tumble over 1.hour on 'timestamp as 'w) + .groupBy('accountId, 'w) + .select('accountId, 'w.start as 'timestamp, 'amount.sum) + .insertInto("spend_report") + + env.execute("Spend Report") + } +} +{% endhighlight %} +</div> +</div> + diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md new file mode 100644 index 0000000..878fb54 --- /dev/null +++ b/docs/getting-started/walkthroughs/table_api.zh.md @@ -0,0 +1,494 @@ +--- +title: "Table API" +nav-id: tableapiwalkthrough +nav-title: 'Table API' +nav-parent_id: walkthroughs +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, batch data sets and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Will You Be Building? + +In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time. +You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline. + +## Prerequisites + +This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. +It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## How To Follow Along + +If you want to follow along, you will require a computer with: + +* Java 8 +* Maven + +A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=spend-report \ + -DartifactId=spend-report \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=spend-report \ + -DartifactId=spend-report \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +</div> + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven's official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); +tEnv.registerTableSink("spend_report", new SpendReportTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("spend_report"); + +env.execute("Spend Report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = BatchTableEnvironment.create(env) + +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) +tEnv.registerTableSink("spend_report", new SpendReportTableSink) + +val truncateDateToHour = new TruncateDateToHour + +tEnv + .scan("transactions") + .insertInto("spend_report") + +env.execute("Spend Report") +{% endhighlight %} +</div> +</div> + +## Breaking Down The Code + +#### The Execution Environment + +The first two lines set up your `ExecutionEnvironment`. +The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources. +This walkthrough begins with the batch environment since you are building a periodic batch report. +It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = BatchTableEnvironment.create(env) +{% endhighlight %} +</div> +</div> + + +#### Registering Tables + +Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data. +A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system. +A table sink emits a table to an external storage system. +Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); +tEnv.registerTableSink("spend_report", new SpendReportTableSink()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) +tEnv.registerTableSink("spend_report", new SpendReportTableSink) +{% endhighlight %} +</div> +</div> + +Two tables are registered; a transaction input table, and a spend report output table. +The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`). +In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems. +In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source. +The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results. + +#### Registering A UDF + +Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/udfs.html) is registered for working with timestamps. +This function takes a timestamp and rounds it down to the nearest hour. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val truncateDateToHour = new TruncateDateToHour +{% endhighlight %} +</div> +</div> + +#### The Query + +With the environment configured and tables registered, you are ready to build your first application. +From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +Initially, the Job reads all transactions and logs them out with log level **INFO**. + +#### Execute + +Flink applications are built lazily and shipped to the cluster for execution only once fully formed. +You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +env.execute("Spend Report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +env.execute("Spend Report") +{% endhighlight %} +</div> +</div> + +## Attempt One + +Now with the skeleton of a Job set-up, you are ready to add some business logic. +The goal is to build a report that shows the total spend for each account across each hour of the day. +Just like a SQL query, Flink can select the required fields and group by your keys. +Because the timestamp field has millisecond granularity, you can use the UDF to round it down to the nearest hour. +Finally, select all the fields, summing the total spend per account-hour pair with the built-in `sum` [aggregate function]({{ site.baseurl }}/dev/table/functions.html#aggregate-functions). + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .select("accountId, timestamp.truncateDateToHour as timestamp, amount") + .groupBy("accountId, timestamp") + .select("accountId, timestamp, amount.sum as total") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount) + .groupBy('accountId, 'timestamp) + .select('accountId, 'timestamp, 'amount.sum as 'total) + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +This query consumes all records from the `transactions` table, calculates the report, and outputs the results in an efficient, scalable manner. + +{% highlight raw %} +# Query 1 output showing account id, timestamp, and amount + +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 +> 1, 2019-01-01 02:00:00.0, $859.35 +> 2, 2019-01-01 02:00:00.0, $458.40 +> 1, 2019-01-01 03:00:00.0, $330.85 +> 2, 2019-01-01 03:00:00.0, $730.02 +> 1, 2019-01-01 04:00:00.0, $585.16 +> 2, 2019-01-01 04:00:00.0, $760.76 +{% endhighlight %} + +## Adding Windows + +Grouping data based on time is a typical operation in data processing, especially when working with infinite streams. +A grouping based on time is called a [window]({{ site.baseurl }} /dev/stream/operators/windows.html) and Flink offers flexible windowing semantics. +The most basic type of window is called a `Tumble` window, which has a fixed size and whose buckets do not overlap. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv + .scan("transactions") + .window(Tumble.over("1.hour").on("timestamp").as("w")) + .groupBy("accountId, w") + .select("accountId, w.start as timestamp, amount.sum") + .insertInto("spend_report"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv + .scan("transactions") + .window(Tumble over 1.hour on 'timestamp as 'w) + .groupBy('accountId, 'w) + .select('accountId, 'w.start as 'timestamp, 'amount.sum) + .insertInto("spend_report") +{% endhighlight %} +</div> +</div> + +This defines your application as using one hour tumbling windows based on the timestamp column. +So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 01:00:00` window. + +Aggregations based on time are unique because time, as opposed to other attributes, generally moves forward in a continuous streaming application. +In a batch context, windows offer a convenient API for grouping records by a timestamp attribute. + +Running the updated query will produce identical results as before. + +{% highlight raw %} +# Query 2 output showing account id, timestamp, and amount + +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 +> 1, 2019-01-01 02:00:00.0, $859.35 +> 2, 2019-01-01 02:00:00.0, $458.40 +> 1, 2019-01-01 03:00:00.0, $330.85 +> 2, 2019-01-01 03:00:00.0, $730.02 +> 1, 2019-01-01 04:00:00.0, $585.16 +> 2, 2019-01-01 04:00:00.0, $760.76 +{% endhighlight %} + +## Once More, With Streaming! + +Because Flink's Table API offers consistent syntax and semantics for both batch and streaming, migrating from one to the other requires just two steps. + +The first step is to replace the batch `ExecutionEnvironment` with its streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous streaming Job. +It includes stream-specific configurations, such as the time characteristic, which when set to [event time]({{ site.baseurl }}/dev/event_time.html) guarantees consistent results even when faced with out-of-order events or a Job failure. +This is what will be used by your `Tumble` window when grouping records. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +val tEnv = StreamTableEnvironment.create(env) +{% endhighlight %} +</div> +</div> + +The second step is to migrate from a bounded data source to an infinite data source. +The project comes with an `UnboundedTransactionTableSource` that continuously creates transaction events in real-time. +Similar to the `BoundedTransactionTableSource` this table is backed by data generated in memory to avoid any dependencies on external systems. +In practice, this table might read from a streaming source such as Apache Kafka, AWS Kinesis, or Pravega. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource) +{% endhighlight %} +</div> +</div> + +And that's it, a fully functional, stateful, distributed streaming application! +The query continuously consumes the stream of transactions, computes the hourly spendings, and emits results as soon as they are ready. +Since the input is unbounded, the query keeps running until it is manually stopped. +And because the Job uses time window-based aggregations, Flink can perform specific optimizations such as state clean up when the framework knows that no more records will arrive for a particular window. + +{% highlight raw %} +# Query 3 output showing account id, timestamp, and amount + +# These rows are calculated continuously over the hour +# and output immediately at the end of the hour +> 1, 2019-01-01 00:00:00.0, $567.87 +> 2, 2019-01-01 00:00:00.0, $726.23 + +# Flink begins computing these rows as soon as +# as the first record for the window arrives +> 1, 2019-01-01 01:00:00.0, $686.87 +> 2, 2019-01-01 01:00:00.0, $810.06 + +{% endhighlight %} + +## Final Application + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +package spendreport; + +import org.apache.flink.walkthrough.common.table.SpendReportTableSink; +import org.apache.flink.walkthrough.common.table.TransactionTableSource; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Tumble; +import org.apache.flink.table.api.java.StreamTableEnvironment; + +public class SpendReport { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource()); + tEnv.registerTableSink("spend_report", new SpendReportTableSink()); + + tEnv + .scan("transactions") + .window(Tumble.over("1.hour").on("timestamp").as("w")) + .groupBy("accountId, w") + .select("accountId, w.start as timestamp, amount.sum") + .insertInto("spend_report"); + + env.execute("Spend Report"); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.api.Tumble +import org.apache.flink.table.api.scala._ +import org.apache.flink.walkthrough.common.table._ + +object SpendReport { + + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val tEnv = StreamTableEnvironment.create(env) + + tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource) + tEnv.registerTableSink("spend_report", new SpendReportTableSink) + + tEnv + .scan("transactions") + .window(Tumble over 1.hour on 'timestamp as 'w) + .groupBy('accountId, 'w) + .select('accountId, 'w.start as 'timestamp, 'amount.sum) + .insertInto("spend_report") + + env.execute("Spend Report") + } +} +{% endhighlight %} +</div> +</div> + diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index b2578fc..653803a 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -136,6 +136,9 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" +run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java" +run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala" + run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh" run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh" diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh new file mode 100755 index 0000000..77afc58 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# End to end test for quick starts test. +# Usage: +# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)> + +source "$(dirname "$0")"/common.sh + +TEST_TYPE=$1 + +mkdir -p "${TEST_DATA_DIR}" +cd "${TEST_DATA_DIR}" + +ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE} +ARTIFACT_VERSION=0.1 + +mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE} \ + -DarchetypeVersion=${FLINK_VERSION} \ + -DgroupId=org.apache.flink.walkthrough \ + -DartifactId=${ARTIFACT_ID} \ + -Dversion=${ARTIFACT_VERSION} \ + -Dpackage=org.apache.flink.walkthrough \ + -DinteractiveMode=false + +cd "${ARTIFACT_ID}" + +mvn clean package -nsu > compile-output.txt + +if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then + echo "Failure: The walk-through did not successfully compile" + exit 1 +fi + +cd target +jar tvf ${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + + echo "Success: There are no flink core classes are contained in the jar." +else + echo "Failure: There are flink core classes are contained in the jar." + exit 1 +fi + +TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar + +add_optional_lib "table" + +start_cluster + +${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR" diff --git a/flink-walkthroughs/flink-walkthrough-common/pom.xml b/flink-walkthroughs/flink-walkthrough-common/pom.xml new file mode 100644 index 0000000..714d344 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthroughs</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> + <name>flink-walkthrough-common</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java new file mode 100644 index 0000000..606d388 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Transaction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.entity; + +import java.util.Objects; + +/** + * A simple transaction. + */ +@SuppressWarnings("unused") +public final class Transaction { + + private long accountId; + + private long timestamp; + + private double amount; + + public Transaction() { } + + public Transaction(long accountId, long timestamp, double amount) { + this.accountId = accountId; + this.timestamp = timestamp; + this.amount = amount; + } + + public long getAccountId() { + return accountId; + } + + public void setAccountId(long accountId) { + this.accountId = accountId; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o == null || getClass() != o.getClass()) { + return false; + } + Transaction that = (Transaction) o; + return accountId == that.accountId && + timestamp == that.timestamp && + Double.compare(that.amount, amount) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(accountId, timestamp, amount); + } + + @Override + public String toString() { + return "Transaction{" + + "accountId=" + accountId + + ", timestamp=" + timestamp + + ", amount=" + amount + + '}'; + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java new file mode 100644 index 0000000..fee3a82 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/LoggerOutputFormat.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A basic output format that logs all records at level <b>INFO</b>. + */ +@Internal +public class LoggerOutputFormat implements OutputFormat<String> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LoggerOutputFormat.class); + + @Override + public void configure(Configuration parameters) { } + + @Override + public void open(int taskNumber, int numTasks) { } + + @Override + public void writeRecord(String record) { + LOG.info(record); + } + + @Override + public void close() { } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java new file mode 100644 index 0000000..0f87f45 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionIterator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.source; + +import org.apache.flink.walkthrough.common.entity.Transaction; + +import java.io.Serializable; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * An iterator of transaction events. + */ +final class TransactionIterator implements Iterator<Transaction>, Serializable { + + private static final long serialVersionUID = 1L; + + private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2019-01-01 00:00:00"); + + private static final long SIX_MINUTES = 6 * 60 * 1000; + + private final boolean bounded; + + private int index = 0; + + private long timestamp; + + static TransactionIterator bounded() { + return new TransactionIterator(true); + } + + static TransactionIterator unbounded() { + return new TransactionIterator(false); + } + + private TransactionIterator(boolean bounded) { + this.bounded = bounded; + this.timestamp = INITIAL_TIMESTAMP.getTime(); + } + + @Override + public boolean hasNext() { + if (index < data.size()) { + return true; + } else if (!bounded) { + index = 0; + return true; + } else { + return false; + } + } + + @Override + public Transaction next() { + Transaction transaction = data.get(index++); + transaction.setTimestamp(timestamp); + timestamp += SIX_MINUTES; + return transaction; + } + + private static List<Transaction> data = Arrays.asList( + new Transaction(1, 0L, 188.23), + new Transaction(2, 0L, 374.79), + new Transaction(3, 0L, 112.15), + new Transaction(4, 0L, 478.75), + new Transaction(5, 0L, 208.85), + new Transaction(1, 0L, 379.64), + new Transaction(2, 0L, 351.44), + new Transaction(3, 0L, 320.75), + new Transaction(4, 0L, 259.42), + new Transaction(5, 0L, 273.44), + new Transaction(1, 0L, 267.25), + new Transaction(2, 0L, 397.15), + new Transaction(3, 0L, 0.219), + new Transaction(4, 0L, 231.94), + new Transaction(5, 0L, 384.73), + new Transaction(1, 0L, 419.62), + new Transaction(2, 0L, 412.91), + new Transaction(3, 0L, 0.77), + new Transaction(4, 0L, 22.10), + new Transaction(5, 0L, 377.54), + new Transaction(1, 0L, 375.44), + new Transaction(2, 0L, 230.18), + new Transaction(3, 0L, 0.80), + new Transaction(4, 0L, 350.89), + new Transaction(5, 0L, 127.55), + new Transaction(1, 0L, 483.91), + new Transaction(2, 0L, 228.22), + new Transaction(3, 0L, 871.15), + new Transaction(4, 0L, 64.19), + new Transaction(5, 0L, 79.43), + new Transaction(1, 0L, 56.12), + new Transaction(2, 0L, 256.48), + new Transaction(3, 0L, 148.16), + new Transaction(4, 0L, 199.95), + new Transaction(5, 0L, 252.37), + new Transaction(1, 0L, 274.73), + new Transaction(2, 0L, 473.54), + new Transaction(3, 0L, 119.92), + new Transaction(4, 0L, 323.59), + new Transaction(5, 0L, 353.16), + new Transaction(1, 0L, 211.90), + new Transaction(2, 0L, 280.93), + new Transaction(3, 0L, 347.89), + new Transaction(4, 0L, 459.86), + new Transaction(5, 0L, 82.31), + new Transaction(1, 0L, 373.26), + new Transaction(2, 0L, 479.83), + new Transaction(3, 0L, 454.25), + new Transaction(4, 0L, 83.64), + new Transaction(5, 0L, 292.44)); +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java new file mode 100644 index 0000000..5acb17e --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionRowInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.types.Row; +import org.apache.flink.walkthrough.common.entity.Transaction; + +import java.sql.Timestamp; +import java.util.Iterator; + +/** + * An bounded input of transactions. + */ +@Internal +public class TransactionRowInputFormat extends GenericInputFormat<Row> implements NonParallelInput { + + private static final long serialVersionUID = 1L; + + private transient Iterator<Transaction> transactions; + + @Override + public void open(GenericInputSplit split) { + transactions = TransactionIterator.bounded(); + } + + @Override + public boolean reachedEnd() { + return !transactions.hasNext(); + } + + @Override + public Row nextRecord(Row reuse) { + Transaction transaction = transactions.next(); + reuse.setField(0, transaction.getAccountId()); + reuse.setField(1, new Timestamp(transaction.getTimestamp())); + reuse.setField(2, transaction.getAmount()); + + return reuse; + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java new file mode 100644 index 0000000..d882b1c --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/source/TransactionSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.source; + +import org.apache.flink.annotation.Public; +import org.apache.flink.streaming.api.functions.source.FromIteratorFunction; +import org.apache.flink.walkthrough.common.entity.Transaction; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * A stream of transactions. + */ +@Public +public class TransactionSource extends FromIteratorFunction<Transaction> { + + private static final long serialVersionUID = 1L; + + public TransactionSource() { + super(new RateLimitedIterator<>(TransactionIterator.unbounded())); + } + + private static class RateLimitedIterator<T> implements Iterator<T>, Serializable { + + private static final long serialVersionUID = 1L; + + private final Iterator<T> inner; + + private RateLimitedIterator(Iterator<T> inner) { + this.inner = inner; + } + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public T next() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return inner.next(); + } + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java new file mode 100644 index 0000000..e51a341 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/BoundedTransactionTableSource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat; + +/** + * A table source for reading a bounded set of transaction events. + * + * <p>This could be backed by a table, database, or other static data set. + */ +@PublicEvolving +@SuppressWarnings({"deprecation", "unused"}) +public class BoundedTransactionTableSource extends InputFormatTableSource<Row> { + @Override + public InputFormat<Row, ?> getInputFormat() { + return new TransactionRowInputFormat(); + } + + @Override + public DataType getProducedDataType() { + return getTableSchema().toRowDataType(); + } + + @Override + public TableSchema getTableSchema() { + return TableSchema.builder() + .field("accountId", Types.LONG) + .field("timestamp", Types.SQL_TIMESTAMP) + .field("amount", Types.DOUBLE) + .build(); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java new file mode 100644 index 0000000..6c85717 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat; + +/** + * A simple table sink for writing to stdout. + */ +@PublicEvolving +@SuppressWarnings("deprecation") +public class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + + private final TableSchema schema; + + public SpendReportTableSink() { + this.schema = TableSchema + .builder() + .field("accountId", Types.LONG) + .field("timestamp", Types.SQL_TIMESTAMP) + .field("amount", Types.DOUBLE) + .build(); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet + .map(SpendReportTableSink::format) + .output(new LoggerOutputFormat()); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream + .map(SpendReportTableSink::format) + .writeUsingOutputFormat(new LoggerOutputFormat()); + } + + @Override + public TableSchema getTableSchema() { + return schema; + } + + @Override + public DataType getConsumedDataType() { + return getTableSchema().toRowDataType(); + } + + @Override + public String[] getFieldNames() { + return getTableSchema().getFieldNames(); + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return getTableSchema().getFieldTypes(); + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + return this; + } + + private static String format(Row row) { + //noinspection MalformedFormatString + return String.format("%s, %s, $%.2f", row.getField(0), row.getField(1), row.getField(2)); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java new file mode 100644 index 0000000..d9f35b9 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/TruncateDateToHour.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.functions.ScalarFunction; + +/** + * A user defined function for rounding timestamps down to + * the nearest hour. + */ +@PublicEvolving +@SuppressWarnings("unused") +public class TruncateDateToHour extends ScalarFunction { + + private static final long serialVersionUID = 1L; + + private static final long ONE_HOUR = 60 * 60 * 1000; + + public long eval(long timestamp) { + return timestamp - (timestamp % ONE_HOUR); + } + + @Override + public TypeInformation<?> getResultType(Class<?>[] signature) { + return Types.SQL_TIMESTAMP; + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java new file mode 100644 index 0000000..f6114da --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/UnboundedTransactionTableSource.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sources.DefinedRowtimeAttributes; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.tsextractors.ExistingField; +import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.walkthrough.common.entity.Transaction; +import org.apache.flink.walkthrough.common.source.TransactionSource; + +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; + +/** + * A table source for reading an unbounded set of transactions. + * + * <p>This table could be backed by a message queue or other streaming data source. + */ +@PublicEvolving +@SuppressWarnings({"deprecation", "unused"}) +public class UnboundedTransactionTableSource + implements StreamTableSource<Row>, + DefinedRowtimeAttributes { + + @Override + public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { + return execEnv + .addSource(new TransactionSource()) + .map(transactionRowMapFunction()) + .returns(getTableSchema().toRowType()); + } + + private MapFunction<Transaction, Row> transactionRowMapFunction() { + return transaction -> Row.of( + transaction.getAccountId(), + new Timestamp(transaction.getTimestamp()), + transaction.getAmount()); + } + + @Override + public DataType getProducedDataType() { + return getTableSchema().toRowDataType(); + } + + @Override + public TableSchema getTableSchema() { + return TableSchema.builder() + .field("accountId", Types.LONG) + .field("timestamp", Types.SQL_TIMESTAMP) + .field("amount", Types.DOUBLE) + .build(); + } + + @Override + public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { + return Collections.singletonList( + new RowtimeAttributeDescriptor( + "timestamp", + new ExistingField("timestamp"), + new BoundedOutOfOrderTimestamps(100))); + } +} diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml similarity index 54% copy from docs/getting-started/tutorials/index.md copy to flink-walkthroughs/flink-walkthrough-table-java/pom.xml index 9b0c8f4..8834701 100644 --- a/docs/getting-started/tutorials/index.md +++ b/flink-walkthroughs/flink-walkthrough-table-java/pom.xml @@ -1,10 +1,3 @@ ---- -title: "Tutorials" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' -nav-parent_id: getting-started -nav-pos: 1 ---- <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -23,3 +16,22 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthroughs</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthrough-table-java</artifactId> + <packaging>maven-archetype</packaging> + +</project> diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml similarity index 52% copy from docs/getting-started/tutorials/index.md copy to flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml index 9b0c8f4..c0e8806 100644 --- a/docs/getting-started/tutorials/index.md +++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -1,10 +1,3 @@ ---- -title: "Tutorials" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' -nav-parent_id: getting-started -nav-pos: 1 ---- <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -23,3 +16,21 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + +<archetype-descriptor + xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" + name="flink-walkthrough-table-java"> + <fileSets> + <fileSet filtered="true" packaged="true" encoding="UTF-8"> + <directory>src/main/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </fileSet> + <fileSet encoding="UTF-8"> + <directory>src/main/resources</directory> + </fileSet> + </fileSets> +</archetype-descriptor> diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 0000000..81fdd66 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,263 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>${groupId}</groupId> + <artifactId>${artifactId}</artifactId> + <version>${version}</version> + <packaging>jar</packaging> + + <name>Flink Walkthrough Table Java</name> + <url>https://flink.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>@project.version@</flink.version> + <java.version>1.8</java.version> + <scala.binary.version>2.11</scala.binary.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> + </properties> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Table ecosystem --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Add connector dependencies here. They must be in the default scope (compile). --> + + <!-- Example: + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + --> + + <!-- Add logging framework, to produce console output when running in the IDE. --> + <!-- These dependencies are excluded from the application JAR by default. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + + <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> + <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>${package}.SpendReport</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + + <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <versionRange>[3.0.0,)</versionRange> + <goals> + <goal>shade</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <versionRange>[3.1,)</versionRange> + <goals> + <goal>testCompile</goal> + <goal>compile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <!-- This profile helps to make things run out of the box in IntelliJ --> + <!-- Its adds Flink's core classes to the runtime class path. --> + <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> + <profiles> + <profile> + <id>add-dependencies-for-IDEA</id> + + <activation> + <property> + <name>idea.version</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java new file mode 100644 index 0000000..a3911ed --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/java/SpendReport.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.walkthrough.common.table.SpendReportTableSink; +import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource; +import org.apache.flink.walkthrough.common.table.TruncateDateToHour; + +/** + * Skeleton code for the table walkthrough + */ +public class SpendReport { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + + tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); + tEnv.registerTableSink("spend_report", new SpendReportTableSink()); + tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + + tEnv + .scan("transactions") + .insertInto("spend_report"); + + env.execute("Spend Report"); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties new file mode 100644 index 0000000..8deec36 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=Warn, console +log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml similarity index 54% copy from docs/getting-started/tutorials/index.md copy to flink-walkthroughs/flink-walkthrough-table-scala/pom.xml index 9b0c8f4..d67a59c 100644 --- a/docs/getting-started/tutorials/index.md +++ b/flink-walkthroughs/flink-walkthrough-table-scala/pom.xml @@ -1,10 +1,3 @@ ---- -title: "Tutorials" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' -nav-parent_id: getting-started -nav-pos: 1 ---- <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -23,3 +16,22 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthroughs</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthrough-table-scala</artifactId> + <packaging>maven-archetype</packaging> + +</project> diff --git a/docs/getting-started/tutorials/index.md b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml similarity index 52% copy from docs/getting-started/tutorials/index.md copy to flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml index 9b0c8f4..977e5ca 100644 --- a/docs/getting-started/tutorials/index.md +++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -1,10 +1,3 @@ ---- -title: "Tutorials" -nav-id: tutorials -nav-title: '<i class="fa fa-power-off title appetizer" aria-hidden="true"></i> Tutorials' -nav-parent_id: getting-started -nav-pos: 1 ---- <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -23,3 +16,21 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> + +<archetype-descriptor + xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" + name="flink-walkthrough-table-scala"> + <fileSets> + <fileSet filtered="true" packaged="true" encoding="UTF-8"> + <directory>src/main/scala</directory> + <includes> + <include>**/*.scala</include> + </includes> + </fileSet> + <fileSet encoding="UTF-8"> + <directory>src/main/resources</directory> + </fileSet> + </fileSets> +</archetype-descriptor> diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 0000000..ca9a181 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,300 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>${groupId}</groupId> + <artifactId>${artifactId}</artifactId> + <version>${version}</version> + <packaging>jar</packaging> + + <name>Flink Walkthrough Table Scala</name> + <url>https://flink.apache.org</url> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>@project.version@</flink.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.12</scala.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- Apache Flink dependencies --> + <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Table ecosystem --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Add connector dependencies here. They must be in the default scope (compile). --> + + <!-- Example: + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + --> + + <!-- Add logging framework, to produce console output when running in the IDE. --> + <!-- These dependencies are excluded from the application JAR by default. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> + <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>${package}.SpendReport</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Eclipse Scala Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <!-- This profile helps to make things run out of the box in IntelliJ --> + <!-- Its adds Flink's core classes to the runtime class path. --> + <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> + <profiles> + <profile> + <id>add-dependencies-for-IDEA</id> + + <activation> + <property> + <name>idea.version</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties new file mode 100644 index 0000000..8deec36 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=Warn, console +log4j.logger.org.apache.flink.walkthrough.common.sink.LoggerOutputFormat=info + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala new file mode 100644 index 0000000..47657b8 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-table-scala/src/main/resources/archetype-resources/src/main/scala/SpendReport.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package} + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.walkthrough.common.table._ + +object SpendReport { + def main(args: Array[String]): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = BatchTableEnvironment.create(env) + + tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) + tEnv.registerTableSink("spend_report", new SpendReportTableSink) + + val truncateDateToHour = new TruncateDateToHour + + tEnv + .scan("transactions") + .insertInto("spend_report") + + env.execute("Spend Report") + } +} diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml new file mode 100644 index 0000000..2733f59 --- /dev/null +++ b/flink-walkthroughs/pom.xml @@ -0,0 +1,95 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthroughs</artifactId> + <packaging>pom</packaging> + + <name>flink-walkthroughs</name> + + <modules> + <module>flink-walkthrough-common</module> + <module>flink-walkthrough-table-java</module> + <module>flink-walkthrough-table-scala</module> + </modules> + <build> + <extensions> + <extension> + <groupId>org.apache.maven.archetype</groupId> + <artifactId>archetype-packaging</artifactId> + <version>2.2</version> + </extension> + </extensions> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-archetype-plugin</artifactId> + <version>2.2</version> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <artifactId>maven-archetype-plugin</artifactId> + <version>2.2</version><!--$NO-MVN-MAN-VER$--> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <!-- deactivate the shade plugin for the walkthrough archetypes --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase/> + </execution> + </executions> + </plugin> + + <!-- use alternative delimiter for filtering resources --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <configuration> + <useDefaultDelimiters>false</useDefaultDelimiters> + <delimiters> + <delimiter>@</delimiter> + </delimiters> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + </build> +</project> diff --git a/pom.xml b/pom.xml index 6d77568..9300fef 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ under the License. <module>flink-docs</module> <module>flink-python</module> <module>flink-ml-parent</module> + <module>flink-walkthroughs</module> </modules> <properties>