Gerrrr commented on code in PR #443: URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053657989
########## docs/content/docs/features/table-types.md: ########## @@ -0,0 +1,142 @@ +--- +title: "Table Types" +weight: 1 +type: docs +aliases: +- /features/table-types.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Table Types + +Table Store supports various types of tables. Users can specify `write-mode` table property to specify table types when creating tables. + +## Changelog Tables with Primary Keys + +Changelog table is the default table type when creating a table. Users can also specify `'write-mode' = 'change-log'` explicitly in table properties when creating the table. + +Primary keys are a set of columns that are unique for each record. Table Store imposes an ordering of data, which means the system will sort the primary key within each bucket. Using this feature, users can achieve high performance by adding filter conditions on the primary key. + +By [defining primary keys]({{< ref "docs/sql-api/creating-tables#tables-with-primary-keys" >}}) on a changelog table, users can access the following features. + +### Merge Engines + +When Table Store sink receives two or more records with the same primary keys, it will merge them into one record to keep primary keys unique. By specifying the `merge-engine` table property, users can choose how records are merged together. + +#### Deduplicate + +`deduplicate` merge engine is the default merge engine. Table Store will only keep the latest record and throw away other records with the same primary keys. + +Specifically, if the latest record is a `DELETE` record, all records with the same primary keys will be deleted. + +#### Partial Update + +By specifying `'merge-engine' = 'partial-update'`, users can set columns of a record across multiple updates and finally get a complete record. Specifically, value fields are updated to the latest data one by one under the same primary key, but null values are not overwritten. + +For example, let's say Table Store receives three records `<1, 23.0, 10, NULL>`, `<1, NULL, NULL, 'This is a book'>` and `<1, 25.2, NULL, NULL>`, where the first column is the primary key. The final result will be `<1, 25.2, 10, 'This is a book'>`. + +NOTE: For streaming queries, `partial-update` merge engine must be used together with `full-compaction` [changelog producer]({{< ref "docs/features/table-types#changelog-producers" >}}). + +#### Aggregation + +Sometimes users only care about aggregated results. The `aggregation` merge engine aggregates each value field with the latest data one by one under the same primary key according to the aggregate function. + +Each field not part of the primary keys must be given an aggregate function, specified by the `fields.<field-name>.aggregate-function` table property. For example, consider the following table definition. + +{{< tabs "aggregation-merge-engine-example" >}} + +{{< tab "Flink" >}} + +```sql +CREATE TABLE MyTable ( + product_id BIGINT, + price DOUBLE, + sales BIGINT, + PRIMARY KEY (product_id) NOT ENFORCED +) WITH ( + 'merge-engine' = 'aggregation', + 'fields.price.aggregate-function' = 'max', + 'fields.sales.aggregate-function' = 'sum' +); +``` + +{{< /tab >}} + +{{< /tabs >}} + +Field `price` will be aggregated by the `max` function, and field `sales` will be aggregated by the `sum` function. Given two input records `<1, 23.0, 15>` and `<1, 30.2, 20>`, the final result will be `<1, 30.2, 35>`. + +Current supported aggregate functions are data types are: + +* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and DOUBLE. +* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ. +* `last_value` / `last_non_null_value`: support all data types. +* `listagg`: supports STRING data type. +* `bool_and` / `bool_or`: support BOOLEAN data type. + +### Changelog Producers + +Streaming queries will continuously produce latest changes. These changes can come from the underlying table files or from an [external log system]({{< ref "docs/features/external-log-systems" >}}) like Kafka. Compared to the external log system, changes from table files have lower cost but higher latency (depending on how often snapshots are created). + +By specifying the `changelog-producer` table property when creating the table, users can choose the pattern of changes produced from files. Review Comment: Does it mean that the `changelog-producer` option is only relevant to the changelog from files? Does it do anything when Kafka is used? ########## docs/content/docs/sql-api/writing-tables.md: ########## @@ -0,0 +1,79 @@ +--- +title: "Writing Tables" +weight: 4 +type: docs +aliases: +- /sql-api/writing-tables.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Writing Tables + +## Applying Records/Changes to Tables + +{{< tabs "insert-into-example" >}} + +{{< tab "Flink" >}} + +Use `INSERT INTO` to apply records and changes to tables. + +```sql +INSERT INTO MyTable SELECT ... +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Overwriting the Whole Table Review Comment: What does it mean to overwrite a table or a partition? ########## docs/content/docs/sql-api/querying-tables.md: ########## @@ -24,99 +24,98 @@ specific language governing permissions and limitations under the License. --> -# Query Table - -You can directly SELECT the table in batch runtime mode of Flink SQL. - -```sql --- Batch mode, read latest snapshot -SET 'execution.runtime-mode' = 'batch'; -SELECT * FROM MyTable; -``` - -## Query Engines - -Table Store not only supports Flink SQL queries natively but also provides -queries from other popular engines. See [Engines]({{< ref "docs/engines/overview" >}}) - -## Query Optimization - -It is highly recommended to specify partition and primary key filters -along with the query, which will speed up the data skipping of the query. - -The filter functions that can accelerate data skipping are: -- `=` -- `<` -- `<=` -- `>` -- `>=` -- `IN (...)` -- `LIKE 'abc%'` -- `IS NULL` - -Table Store will sort the data by primary key, which speeds up the point queries -and range queries. When using a composite primary key, it is best for the query -filters to form a [leftmost prefix](https://dev.mysql.com/doc/refman/5.7/en/multiple-column-indexes.html) -of the primary key for good acceleration. - -Suppose that a table has the following specification: - -```sql -CREATE TABLE orders ( - catalog_id BIGINT, - order_id BIGINT, - ....., - PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key -) -``` - -The query obtains a good acceleration by specifying a range filter for -the leftmost prefix of the primary key. - -```sql -SELECT * FROM orders WHERE catalog_id=1025; - -SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495; - -SELECT * FROM orders - WHERE catalog_id=1025 - AND order_id>2035 AND order_id<6000; -``` - -However, the following filter cannot accelerate the query well. - -```sql -SELECT * FROM orders WHERE order_id=29495; - -SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495; -``` - -## Snapshots Table - -You can query the snapshot history information of the table through Flink SQL. +# Querying Tables + +Just like all other tables, Table Store tables can be queried with `SELECT` statement. Review Comment: As I mentioned above, it would be very helpful to understand under what situations FTS adds a normalization node to the query topology. ########## docs/content/docs/maintenance-actions/write-performance.md: ########## @@ -0,0 +1,118 @@ +--- +title: "Write Performance" +weight: 1 +type: docs +aliases: +- /maintenance-actions/write-performance.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Write Performance + +Performance of Table Store writers are related with the following factors. + +## Parallelism + +It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the `sink.parallelism` table property. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-left" style="width: 5%">Required</th> + <th class="text-left" style="width: 5%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.parallelism</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Integer</td> + <td>Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> + </tr> + </tbody> +</table> + +## Number of Sorted Runs to Trigger Compaction + +Table Store uses LSM which supports a large number of updates. LSM organizes files in several "sorted runs". When querying records from an LSM, all sorted runs must be combined to produce a complete view of all records. Review Comment: Is a sorted run a collection of data files that contain non-overlapping keys? Can a single LSM level contain more than a single sorted run? Can a single data file belong to more than one sorted run? ########## docs/content/docs/features/external-log-systems.md: ########## @@ -0,0 +1,64 @@ +--- +title: "External Log Systems" +weight: 2 +type: docs +aliases: +- /features/external-log-systems.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# External Log Systems + +Aside from [underlying table files]({{< ref "docs/features/table-types#changelog-producers" >}}), changelog of Table Store can also be stored into or consumed from an external log system, such as Kafka. By specifying `log.system` table property, users can choose which external log system to use. + +If an external log system is used, all records written into table files will also be written into the log system. Changes produced by the streaming queries will thus come from the log system instead of table files. + +## Consistency Guarantees + +By default, changes in the log systems are visible to consumers only after a snapshot, just like table files. This behavior guarantees the exactly-once semantics. That is, each record is seen by the consumers exactly once. + +However, users can also specify the table property `'log.consistency' = 'eventual'` so that changelog written into the log system can be immediately consumed by the consumers, without waiting for the next snapshot. This behavior decreases the latency of changelog, but it can only guarantee the at-least-once semantics (that is, consumers might see duplicated records) due to possible failures. Review Comment: It might be worth mentioning somewhere that `'log.consistency' = 'eventual'` adds a normalization node to streaming queries. In general, it would very helpful to understand when FTS adds a normalization node in queries and insertions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org