lirui-apache commented on a change in pull request #14145:
URL: https://github.com/apache/flink/pull/14145#discussion_r527368062



##########
File path: docs/dev/table/hive/hive_read_write.md
##########
@@ -22,119 +22,199 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and 
write from Hive data as an alternative to Hive's batch engine.
-Be sure to follow the instructions to include the correct [dependencies]({{ 
site.baseurl }}/dev/table/hive/#depedencies) in your application.
-And please also note that Hive connector only works with blink planner.
+Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM 
processing of Apache 
+Hive Tables. This means Flink can be used as a more performant alternative to 
Hive’s batch engine,
+or to continuously read and write data into and out of Hive tables to power 
real-time data
+warehousing applications. 
+
+<div class="alert alert-info">
+   <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only 
supported by the Blink table planner.
+</div>
 
 * This will be replaced by the TOC
 {:toc}
 
-## Reading From Hive
+## Reading
 
-Assume Hive contains a single table in its `default` database, named people 
that contains several rows.
+Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. 
When run as a `BATCH`
+application, Flink will execute its query over the state of the table at the 
point in time when the
+query is executed. `STREAMING` reads will continuously monitor the table and 
incrementally fetch
+new data as it is made available. Flink will read tables as bounded by default.
+
+`STREAMING` reads support consuming both partitioned and non-partitioned 
tables. 
+For partitioned tables, Flink will monitor the generation of new partitions, 
and read
+them incrementally when available. For non-partitioned tables, Flink will 
monitor the generation
+of new files in the folder and read new files incrementally.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>streaming-source.enable</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Enable streaming source or not. NOTES: Please make sure that each 
partition/file should be written atomically, otherwise the reader may get 
incomplete data.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.monitor-interval</h5></td>
+        <td style="word-wrap: break-word;">1 m</td>
+        <td>Duration</td>
+        <td>Time interval for consecutively monitoring partition/file.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-order</h5></td>
+        <td style="word-wrap: break-word;">create-time</td>
+        <td>String</td>
+        <td>The consume order of streaming source, support create-time and 
partition-time. create-time compare partition/file creation time, this is not 
the partition create time in Hive metaStore, but the folder/file modification 
time in filesystem; partition-time compare time represented by partition name, 
if the partition folder somehow gets updated, e.g. add new file into folder, it 
can affect how the data is consumed. For non-partition table, this value should 
always be 'create-time'.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-start-offset</h5></td>
+        <td style="word-wrap: break-word;">1970-00-00</td>
+        <td>String</td>
+        <td>Start offset for streaming consuming. How to parse and compare 
offsets depends on your order. For create-time and partition-time, should be a 
timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use 
partition time extractor to extract time from partition.</td>
+    </tr>
+  </tbody>
+</table>
+
+[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply 
configurations to a Hive table
+without changing its definition in the Hive metastore.
+
+{% highlight sql %}
+
+SELECT * 
+FROM hive_table 
+/*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
-{% highlight bash %}
-hive> show databases;
-OK
-default
-Time taken: 0.841 seconds, Fetched: 1 row(s)
-
-hive> show tables;
-OK
-Time taken: 0.087 seconds
-
-hive> CREATE TABLE mytable(name string, value double);
-OK
-Time taken: 0.127 seconds
-
-hive> SELECT * FROM mytable;
-OK
-Tom   4.72
-John  8.0
-Tom   24.2
-Bob   3.14
-Bob   4.72
-Tom   34.9
-Mary  4.79
-Tiff  2.72
-Bill  4.33
-Mary  77.7
-Time taken: 0.097 seconds, Fetched: 10 row(s)
 {% endhighlight %}
 
-With the data ready your can connect to Hive [connect to an existing Hive 
installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin 
querying. 
+**Notes**
 
-{% highlight bash %}
+- Monitor strategy is to scan all directories/files currently in the location 
path. Many partitions may cause performance degradation.
+- Streaming reads for non-partitioned tables requires that each file be 
written atomically into the target directory.
+- Streaming reading for partitioned tables requires that each partition should 
be added atomically in the view of hive metastore. If not, new data added to an 
existing partition will be consumed.
+- Streaming reads do not support watermark grammar in Flink DDL. These tables 
cannot be used for window operators.
 
-Flink SQL> show catalogs;
-myhive
-default_catalog
+## Reading Hive Views
 
-# ------ Set the current catalog to be 'myhive' catalog if you haven't set it 
in the yaml file ------
+Flink is able to read from Hive defined views, but some limitations apply:
 
-Flink SQL> use catalog myhive;
+1) The Hive catalog must be set as the current catalog before you can query 
the view. 
+This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE 
CATALOG ...` in SQL Client.
 
-# ------ See all registered database in catalog 'mytable' ------
+2) Hive and Flink SQL have different syntax, e.g. different reserved keywords 
and literals.
+Make sure the view’s query is compatible with Flink grammar.
 
-Flink SQL> show databases;
-default
+### Temporal Table Join

Review comment:
       I wonder whether we should list all the source tables that can be used 
as temporal tables (maybe in [this 
page](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table)).
 And then we can put a cross link here. It's annoying for users to have to 
check each connector doc to decide whether a table can be used in temporal 
join. And maybe users don't even expect to find such information in the 
"read/write" page.

##########
File path: docs/dev/table/hive/hive_read_write.md
##########
@@ -22,119 +22,199 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and 
write from Hive data as an alternative to Hive's batch engine.
-Be sure to follow the instructions to include the correct [dependencies]({{ 
site.baseurl }}/dev/table/hive/#depedencies) in your application.
-And please also note that Hive connector only works with blink planner.
+Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM 
processing of Apache 
+Hive Tables. This means Flink can be used as a more performant alternative to 
Hive’s batch engine,
+or to continuously read and write data into and out of Hive tables to power 
real-time data
+warehousing applications. 
+
+<div class="alert alert-info">
+   <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only 
supported by the Blink table planner.
+</div>
 
 * This will be replaced by the TOC
 {:toc}
 
-## Reading From Hive
+## Reading
 
-Assume Hive contains a single table in its `default` database, named people 
that contains several rows.
+Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes. 
When run as a `BATCH`
+application, Flink will execute its query over the state of the table at the 
point in time when the
+query is executed. `STREAMING` reads will continuously monitor the table and 
incrementally fetch
+new data as it is made available. Flink will read tables as bounded by default.
+
+`STREAMING` reads support consuming both partitioned and non-partitioned 
tables. 
+For partitioned tables, Flink will monitor the generation of new partitions, 
and read
+them incrementally when available. For non-partitioned tables, Flink will 
monitor the generation
+of new files in the folder and read new files incrementally.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>streaming-source.enable</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Enable streaming source or not. NOTES: Please make sure that each 
partition/file should be written atomically, otherwise the reader may get 
incomplete data.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.monitor-interval</h5></td>
+        <td style="word-wrap: break-word;">1 m</td>
+        <td>Duration</td>
+        <td>Time interval for consecutively monitoring partition/file.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-order</h5></td>
+        <td style="word-wrap: break-word;">create-time</td>
+        <td>String</td>
+        <td>The consume order of streaming source, support create-time and 
partition-time. create-time compare partition/file creation time, this is not 
the partition create time in Hive metaStore, but the folder/file modification 
time in filesystem; partition-time compare time represented by partition name, 
if the partition folder somehow gets updated, e.g. add new file into folder, it 
can affect how the data is consumed. For non-partition table, this value should 
always be 'create-time'.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-start-offset</h5></td>
+        <td style="word-wrap: break-word;">1970-00-00</td>
+        <td>String</td>
+        <td>Start offset for streaming consuming. How to parse and compare 
offsets depends on your order. For create-time and partition-time, should be a 
timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use 
partition time extractor to extract time from partition.</td>
+    </tr>
+  </tbody>
+</table>
+
+[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply 
configurations to a Hive table
+without changing its definition in the Hive metastore.
+
+{% highlight sql %}
+
+SELECT * 
+FROM hive_table 
+/*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
-{% highlight bash %}
-hive> show databases;
-OK
-default
-Time taken: 0.841 seconds, Fetched: 1 row(s)
-
-hive> show tables;
-OK
-Time taken: 0.087 seconds
-
-hive> CREATE TABLE mytable(name string, value double);
-OK
-Time taken: 0.127 seconds
-
-hive> SELECT * FROM mytable;
-OK
-Tom   4.72
-John  8.0
-Tom   24.2
-Bob   3.14
-Bob   4.72
-Tom   34.9
-Mary  4.79
-Tiff  2.72
-Bill  4.33
-Mary  77.7
-Time taken: 0.097 seconds, Fetched: 10 row(s)
 {% endhighlight %}
 
-With the data ready your can connect to Hive [connect to an existing Hive 
installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin 
querying. 
+**Notes**
 
-{% highlight bash %}
+- Monitor strategy is to scan all directories/files currently in the location 
path. Many partitions may cause performance degradation.
+- Streaming reads for non-partitioned tables requires that each file be 
written atomically into the target directory.
+- Streaming reading for partitioned tables requires that each partition should 
be added atomically in the view of hive metastore. If not, new data added to an 
existing partition will be consumed.
+- Streaming reads do not support watermark grammar in Flink DDL. These tables 
cannot be used for window operators.
 
-Flink SQL> show catalogs;
-myhive
-default_catalog
+## Reading Hive Views
 
-# ------ Set the current catalog to be 'myhive' catalog if you haven't set it 
in the yaml file ------
+Flink is able to read from Hive defined views, but some limitations apply:
 
-Flink SQL> use catalog myhive;
+1) The Hive catalog must be set as the current catalog before you can query 
the view. 
+This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE 
CATALOG ...` in SQL Client.
 
-# ------ See all registered database in catalog 'mytable' ------
+2) Hive and Flink SQL have different syntax, e.g. different reserved keywords 
and literals.
+Make sure the view’s query is compatible with Flink grammar.
 
-Flink SQL> show databases;
-default
+### Temporal Table Join
 
-# ------ See the previously registered table 'mytable' ------
+You can use a Hive table as a temporal table and join streaming data with it. 
Please follow
+the [example]({% link dev/table/streaming/temporal_tables.md 
%}#temporal-table) to find
+out how to join a temporal table.
 
-Flink SQL> show tables;
-mytable
+When performing the join, the Hive table will be cached in Slot memory and 
each record from
+the stream is joined against the table by key to decide whether a match is 
found. Using a Hive
+table as a temporal table does not require any additional configuration. 
Optionally, you can
+configure the TTL of the Hive table cache with the following property. After 
the cache expires,
+the Hive table will be scanned again to load the latest data.
 
-# ------ The table schema that Flink sees is the same that we created in Hive, 
two columns - name as string and value as double ------ 
-Flink SQL> describe mytable;
-root
- |-- name: name
- |-- type: STRING
- |-- name: value
- |-- type: DOUBLE
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>lookup.join.cache.ttl</h5></td>
+        <td style="word-wrap: break-word;">60 min</td>
+        <td>Duration</td>
+        <td>The cache TTL (e.g. 10min) for the build table in lookup join. By 
default the TTL is 60 minutes.</td>
+    </tr>
+  </tbody>
+</table>
 
-# ------ Select from hive table or hive view ------ 
-Flink SQL> SELECT * FROM mytable;
+**Notes**:
 
-   name      value
-__________ __________
+-  Each joining subtask needs to keep its own cache of the Hive table. Please 
ensure the Hive table can fit into
+the memory of a TM task slot.
+- It is encouraged to set a relatively large value for 
`lookup.join.cache.ttl`. Otherwise, Jobs
+are prone to performance issues as the table needs to be updated and reloaded 
too frequently. 
+- Currently, Flink simply loads the whole Hive table whenever the cache needs 
to be refreshed.
+There is no way to differentiate new data from old.
 
-    Tom      4.72
-    John     8.0
-    Tom      24.2
-    Bob      3.14
-    Bob      4.72
-    Tom      34.9
-    Mary     4.79
-    Tiff     2.72
-    Bill     4.33
-    Mary     77.7
+### Vectorized Optimization upon Read
 
+Flink will automatically used vectorized reads of Hive tables when the 
following conditions are met:
+
+- Format: ORC or Parquet.
+- Columns without complex data type, like hive types: List, Map, Struct, Union.
+
+This feature is enabled by default. 
+It may be disabled with the following configuration. 
+
+{% highlight bash %}
+table.exec.hive.fallback-mapred-reader=true
 {% endhighlight %}
 
-### Querying Hive views
+### Source Parallelism Inference
 
-If you need to query Hive views, please note:
+By default, Flink will infer the optimal parallelism for its Hive readers 
+based on the number of files, and number of blocks in each file.
 
-1. You have to use the Hive catalog as your current catalog before you can 
query views in that catalog. It can be done by either 
`tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client.
-2. Hive and Flink SQL have different syntax, e.g. different reserved keywords 
and literals. Make sure the view's query is compatible with Flink grammar.
+Flink allows you to flexibly configure the policy of parallelism inference. 
You can configure the
+following parameters in `TableConfig` (note that these parameters affect all 
sources of the job):
 
-## Writing To Hive
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
+        <td style="word-wrap: break-word;">true</td>
+        <td>Boolean</td>
+        <td>If is true, source parallelism is inferred according to splits 
number. If is false, parallelism of source are set by config.</td>
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
+        <td style="word-wrap: break-word;">1000</td>
+        <td>Integer</td>
+        <td>Sets max infer parallelism for source operator.</td>
+    </tr>
+  </tbody>
+</table>
 
-Similarly, data can be written into hive using an `INSERT` clause.
+## Writing
 
-Consider there is an example table named "mytable" with two columns: name and 
age, in string and int type.
+Flink supports writing data from Hive in both `BATCH` and `STREAMING` modes. 
When run as a `BATCH`
+application, Flink will write to a Hive table only making those records 
visible when the Job finishes.
+`BATCH` writes support both appending to and overwriting existing tables.
 
-{% highlight bash %}
+{% highlight sql %}
 # ------ INSERT INTO will append to the table or partition, keeping the 
existing data intact ------ 
 Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
 
 # ------ INSERT OVERWRITE will overwrite any existing data in the table or 
partition ------ 
 Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
 {% endhighlight %}
 
-We support partitioned table too, Consider there is a partitioned table named 
myparttable with four columns: name, age, my_type and my_date, in types ...... 
my_type and my_date are the partition keys.
+Data can also be inserted into a particular partitions. 

Review comment:
       ```suggestion
   Data can also be inserted into particular partitions. 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to