lirui-apache commented on a change in pull request #14145: URL: https://github.com/apache/flink/pull/14145#discussion_r527368062
########## File path: docs/dev/table/hive/hive_read_write.md ########## @@ -22,119 +22,199 @@ specific language governing permissions and limitations under the License. --> -Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. -Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. -And please also note that Hive connector only works with blink planner. +Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM processing of Apache +Hive Tables. This means Flink can be used as a more performant alternative to Hive’s batch engine, +or to continuously read and write data into and out of Hive tables to power real-time data +warehousing applications. + +<div class="alert alert-info"> + <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only supported by the Blink table planner. +</div> * This will be replaced by the TOC {:toc} -## Reading From Hive +## Reading -Assume Hive contains a single table in its `default` database, named people that contains several rows. +Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. When run as a `BATCH` +application, Flink will execute its query over the state of the table at the point in time when the +query is executed. `STREAMING` reads will continuously monitor the table and incrementally fetch +new data as it is made available. Flink will read tables as bounded by default. + +`STREAMING` reads support consuming both partitioned and non-partitioned tables. +For partitioned tables, Flink will monitor the generation of new partitions, and read +them incrementally when available. For non-partitioned tables, Flink will monitor the generation +of new files in the folder and read new files incrementally. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>streaming-source.enable</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.</td> + </tr> + <tr> + <td><h5>streaming-source.monitor-interval</h5></td> + <td style="word-wrap: break-word;">1 m</td> + <td>Duration</td> + <td>Time interval for consecutively monitoring partition/file.</td> + </tr> + <tr> + <td><h5>streaming-source.consume-order</h5></td> + <td style="word-wrap: break-word;">create-time</td> + <td>String</td> + <td>The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem; partition-time compare time represented by partition name, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. For non-partition table, this value should always be 'create-time'.</td> + </tr> + <tr> + <td><h5>streaming-source.consume-start-offset</h5></td> + <td style="word-wrap: break-word;">1970-00-00</td> + <td>String</td> + <td>Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition.</td> + </tr> + </tbody> +</table> + +[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply configurations to a Hive table +without changing its definition in the Hive metastore. + +{% highlight sql %} + +SELECT * +FROM hive_table +/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; -{% highlight bash %} -hive> show databases; -OK -default -Time taken: 0.841 seconds, Fetched: 1 row(s) - -hive> show tables; -OK -Time taken: 0.087 seconds - -hive> CREATE TABLE mytable(name string, value double); -OK -Time taken: 0.127 seconds - -hive> SELECT * FROM mytable; -OK -Tom 4.72 -John 8.0 -Tom 24.2 -Bob 3.14 -Bob 4.72 -Tom 34.9 -Mary 4.79 -Tiff 2.72 -Bill 4.33 -Mary 77.7 -Time taken: 0.097 seconds, Fetched: 10 row(s) {% endhighlight %} -With the data ready your can connect to Hive [connect to an existing Hive installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin querying. +**Notes** -{% highlight bash %} +- Monitor strategy is to scan all directories/files currently in the location path. Many partitions may cause performance degradation. +- Streaming reads for non-partitioned tables requires that each file be written atomically into the target directory. +- Streaming reading for partitioned tables requires that each partition should be added atomically in the view of hive metastore. If not, new data added to an existing partition will be consumed. +- Streaming reads do not support watermark grammar in Flink DDL. These tables cannot be used for window operators. -Flink SQL> show catalogs; -myhive -default_catalog +## Reading Hive Views -# ------ Set the current catalog to be 'myhive' catalog if you haven't set it in the yaml file ------ +Flink is able to read from Hive defined views, but some limitations apply: -Flink SQL> use catalog myhive; +1) The Hive catalog must be set as the current catalog before you can query the view. +This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. -# ------ See all registered database in catalog 'mytable' ------ +2) Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. +Make sure the view’s query is compatible with Flink grammar. -Flink SQL> show databases; -default +### Temporal Table Join Review comment: I wonder whether we should list all the source tables that can be used as temporal tables (maybe in [this page](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table)). And then we can put a cross link here. It's annoying for users to have to check each connector doc to decide whether a table can be used in temporal join. And maybe users don't even expect to find such information in the "read/write" page. ########## File path: docs/dev/table/hive/hive_read_write.md ########## @@ -22,119 +22,199 @@ specific language governing permissions and limitations under the License. --> -Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. -Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. -And please also note that Hive connector only works with blink planner. +Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM processing of Apache +Hive Tables. This means Flink can be used as a more performant alternative to Hive’s batch engine, +or to continuously read and write data into and out of Hive tables to power real-time data +warehousing applications. + +<div class="alert alert-info"> + <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only supported by the Blink table planner. +</div> * This will be replaced by the TOC {:toc} -## Reading From Hive +## Reading -Assume Hive contains a single table in its `default` database, named people that contains several rows. +Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. When run as a `BATCH` +application, Flink will execute its query over the state of the table at the point in time when the +query is executed. `STREAMING` reads will continuously monitor the table and incrementally fetch +new data as it is made available. Flink will read tables as bounded by default. + +`STREAMING` reads support consuming both partitioned and non-partitioned tables. +For partitioned tables, Flink will monitor the generation of new partitions, and read +them incrementally when available. For non-partitioned tables, Flink will monitor the generation +of new files in the folder and read new files incrementally. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>streaming-source.enable</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.</td> + </tr> + <tr> + <td><h5>streaming-source.monitor-interval</h5></td> + <td style="word-wrap: break-word;">1 m</td> + <td>Duration</td> + <td>Time interval for consecutively monitoring partition/file.</td> + </tr> + <tr> + <td><h5>streaming-source.consume-order</h5></td> + <td style="word-wrap: break-word;">create-time</td> + <td>String</td> + <td>The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem; partition-time compare time represented by partition name, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. For non-partition table, this value should always be 'create-time'.</td> + </tr> + <tr> + <td><h5>streaming-source.consume-start-offset</h5></td> + <td style="word-wrap: break-word;">1970-00-00</td> + <td>String</td> + <td>Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition.</td> + </tr> + </tbody> +</table> + +[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply configurations to a Hive table +without changing its definition in the Hive metastore. + +{% highlight sql %} + +SELECT * +FROM hive_table +/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */; -{% highlight bash %} -hive> show databases; -OK -default -Time taken: 0.841 seconds, Fetched: 1 row(s) - -hive> show tables; -OK -Time taken: 0.087 seconds - -hive> CREATE TABLE mytable(name string, value double); -OK -Time taken: 0.127 seconds - -hive> SELECT * FROM mytable; -OK -Tom 4.72 -John 8.0 -Tom 24.2 -Bob 3.14 -Bob 4.72 -Tom 34.9 -Mary 4.79 -Tiff 2.72 -Bill 4.33 -Mary 77.7 -Time taken: 0.097 seconds, Fetched: 10 row(s) {% endhighlight %} -With the data ready your can connect to Hive [connect to an existing Hive installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin querying. +**Notes** -{% highlight bash %} +- Monitor strategy is to scan all directories/files currently in the location path. Many partitions may cause performance degradation. +- Streaming reads for non-partitioned tables requires that each file be written atomically into the target directory. +- Streaming reading for partitioned tables requires that each partition should be added atomically in the view of hive metastore. If not, new data added to an existing partition will be consumed. +- Streaming reads do not support watermark grammar in Flink DDL. These tables cannot be used for window operators. -Flink SQL> show catalogs; -myhive -default_catalog +## Reading Hive Views -# ------ Set the current catalog to be 'myhive' catalog if you haven't set it in the yaml file ------ +Flink is able to read from Hive defined views, but some limitations apply: -Flink SQL> use catalog myhive; +1) The Hive catalog must be set as the current catalog before you can query the view. +This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. -# ------ See all registered database in catalog 'mytable' ------ +2) Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. +Make sure the view’s query is compatible with Flink grammar. -Flink SQL> show databases; -default +### Temporal Table Join -# ------ See the previously registered table 'mytable' ------ +You can use a Hive table as a temporal table and join streaming data with it. Please follow +the [example]({% link dev/table/streaming/temporal_tables.md %}#temporal-table) to find +out how to join a temporal table. -Flink SQL> show tables; -mytable +When performing the join, the Hive table will be cached in Slot memory and each record from +the stream is joined against the table by key to decide whether a match is found. Using a Hive +table as a temporal table does not require any additional configuration. Optionally, you can +configure the TTL of the Hive table cache with the following property. After the cache expires, +the Hive table will be scanned again to load the latest data. -# ------ The table schema that Flink sees is the same that we created in Hive, two columns - name as string and value as double ------ -Flink SQL> describe mytable; -root - |-- name: name - |-- type: STRING - |-- name: value - |-- type: DOUBLE +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>lookup.join.cache.ttl</h5></td> + <td style="word-wrap: break-word;">60 min</td> + <td>Duration</td> + <td>The cache TTL (e.g. 10min) for the build table in lookup join. By default the TTL is 60 minutes.</td> + </tr> + </tbody> +</table> -# ------ Select from hive table or hive view ------ -Flink SQL> SELECT * FROM mytable; +**Notes**: - name value -__________ __________ +- Each joining subtask needs to keep its own cache of the Hive table. Please ensure the Hive table can fit into +the memory of a TM task slot. +- It is encouraged to set a relatively large value for `lookup.join.cache.ttl`. Otherwise, Jobs +are prone to performance issues as the table needs to be updated and reloaded too frequently. +- Currently, Flink simply loads the whole Hive table whenever the cache needs to be refreshed. +There is no way to differentiate new data from old. - Tom 4.72 - John 8.0 - Tom 24.2 - Bob 3.14 - Bob 4.72 - Tom 34.9 - Mary 4.79 - Tiff 2.72 - Bill 4.33 - Mary 77.7 +### Vectorized Optimization upon Read +Flink will automatically used vectorized reads of Hive tables when the following conditions are met: + +- Format: ORC or Parquet. +- Columns without complex data type, like hive types: List, Map, Struct, Union. + +This feature is enabled by default. +It may be disabled with the following configuration. + +{% highlight bash %} +table.exec.hive.fallback-mapred-reader=true {% endhighlight %} -### Querying Hive views +### Source Parallelism Inference -If you need to query Hive views, please note: +By default, Flink will infer the optimal parallelism for its Hive readers +based on the number of files, and number of blocks in each file. -1. You have to use the Hive catalog as your current catalog before you can query views in that catalog. It can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. -2. Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view's query is compatible with Flink grammar. +Flink allows you to flexibly configure the policy of parallelism inference. You can configure the +following parameters in `TableConfig` (note that these parameters affect all sources of the job): -## Writing To Hive +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>table.exec.hive.infer-source-parallelism</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td> + </tr> + <tr> + <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td> + <td style="word-wrap: break-word;">1000</td> + <td>Integer</td> + <td>Sets max infer parallelism for source operator.</td> + </tr> + </tbody> +</table> -Similarly, data can be written into hive using an `INSERT` clause. +## Writing -Consider there is an example table named "mytable" with two columns: name and age, in string and int type. +Flink supports writing data from Hive in both `BATCH` and `STREAMING` modes. When run as a `BATCH` +application, Flink will write to a Hive table only making those records visible when the Job finishes. +`BATCH` writes support both appending to and overwriting existing tables. -{% highlight bash %} +{% highlight sql %} # ------ INSERT INTO will append to the table or partition, keeping the existing data intact ------ Flink SQL> INSERT INTO mytable SELECT 'Tom', 25; # ------ INSERT OVERWRITE will overwrite any existing data in the table or partition ------ Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25; {% endhighlight %} -We support partitioned table too, Consider there is a partitioned table named myparttable with four columns: name, age, my_type and my_date, in types ...... my_type and my_date are the partition keys. +Data can also be inserted into a particular partitions. Review comment: ```suggestion Data can also be inserted into particular partitions. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org