fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r222011035
########## File path: docs/dev/table/streaming/temporal_tables.md ########## @@ -0,0 +1,286 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Temporal Tables represent a concept of a table that changes over time. +Flink can keep track of those changes and allows for accessing the table's content at a certain point in time within a query + +* This will be replaced by the TOC +{:toc} + +Motivation +---------- + +Let's assume that we have the following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +======= ====== ========= +10:15 2 Euro +10:30 1 US Dollar +10:32 50 Yen +10:52 3 Euro +11:04 5 US Dollar +{% endhighlight %} + +`Orders` is an append-only table that represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +======= ======== ====== +09:00 US Dollar 102 +09:00 Euro 114 +09:00 Yen 1 +10:45 Euro 116 +11:15 Euro 119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +======= ====== ========= +10:15 2 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query, +speed up it's execution and reduce state usage. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions +------------------------ + +In order to access the data in the Temporal Table, +one must pass a time attribute that determines the version of the table that will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +======= ======== ====== +09:00 US Dollar 102 +09:00 Euro 114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +======= ======== ====== +09:00 US Dollar 102 +10:45 Euro 116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time Review comment: Do we need to distinguish between Processing Time and Event Time here? Isn't everything exactly the same, except for the type of the attribute? IMO, a highlighted paragraph that currently only processing time is supported should suffice. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services