[ 
https://issues.apache.org/jira/browse/FLINK-21871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-21871:
----------------------------
    Fix Version/s: 1.16.0

> Support watermark for Hive and Filesystem streaming source
> ----------------------------------------------------------
>
>                 Key: FLINK-21871
>                 URL: https://issues.apache.org/jira/browse/FLINK-21871
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> API
>            Reporter: Jark Wu
>            Priority: Major
>              Labels: auto-unassigned, pull-request-available
>             Fix For: 1.15.0, 1.16.0
>
>
> Hive and Filesystem already support streaming source. However, they doesn't 
> support watermark on the source. That means users can't leverage the 
> streaming source to perform the Flink powerful streaming analysis, e.g. 
> window aggregate, interval join, and so on. 
> In order to make more Hive users can leverage Flink to perform streaming 
> analysis, and also cooperate with the new optimized window-TVF operations 
> (FLIP-145), we need to support watermark for Hive and Filesystem. 
> h2. How to emit watermark in Hive and Filesystem
> Factual data in Hive are usually partitioned by date time, e.g. 
> {{pt_day=2021-03-19, pt_hour=10}}. In this case, when the data of partition 
> {{pt_day=2021-03-19, pt_hour=10}} are emitted, we should be able to know all 
> the data before {{2021-03-19 11:00:00}} have been arrived, so we can emit a 
> watermark value of {{2021-03-19 11:00:00}}. We call this partition watermark. 
> The partition watermark is much better than record watermark (extract 
> watermark from record, e.g. {{ts - INTERVAL '1' MINUTE}}). Because in above 
> example, if we are using partition watermark, the window of [10:00, 11:00) 
> will be triggered when pt_hour=10 is finished. However, if we are using 
> record watermark, the window of [10:00, 11:00) will be triggered when 
> pt_hour=11 is arrived, that will make the pipeline have one more partition 
> dely. 
> Therefore, we firstly focus on support partition watermark for Hive and 
> Filesystem.
> h2. Example
> In order to support such watermarks, we propose using the following DDL to 
> define a Hive table with watermark defined:
> {code:sql}
> -- using hive dialect
> CREATE TABLE hive_table (
>   x int, 
>   y string,
>   z int,
>   ts timestamp,
>   WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day string, pt_hour string) 
> TBLPROPERTIES (
>   'streaming-source.enable'='true',
>   'streaming-source.monitor-interval'='1s',
>   'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
>   'partition.time-interval'='1h'
> );
> -- window aggregate on the hive table
> SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z)
> FROM TABLE(
>    TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR))
> GROUP BY window_start, window_end;
> {code}
> For filesystem connector, the DDL can be:
> {code:sql}
> CREATE TABLE fs_table (
>     x int,
>     y string,
>     z int,
>     ts TIMESTAMP(3),
>     pt_day string,
>     pt_hour string,
>     WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day, pt_hour)
>   WITH (
>     'connector' = 'filesystem',
>     'path' = '/path/to/file',
>     'format' = 'parquet',
>     'streaming-source.enable'='true',
>     'streaming-source.monitor-interval'='1s',
>     'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
>     'partition.time-interval'='1h'
> );
> {code}
> I will explain the new function/configuration. 
> h2. SOURCE_WATERMARK built-in function
> FLIP-66[1] proposed {{SYSTEM_WATERMARK}} function for watermarks preserved in 
> underlying source system. 
> However, the SYSTEM prefix sounds like a Flink system generated value, but 
> actually, this is a SOURCE system generated value. 
> So I propose to use {{SOURCE_WATERMARK}} intead, this also keeps the concept 
> align with the API of 
> {{org.apache.flink.table.descriptors.Rowtime#watermarksFromSource}}.
> h2. Table Options for Watermark
> - {{partition.time-extractor.timestamp-pattern}}: this option already exists. 
> This is used to extract/convert partition value to a timestamp value.
> - {{partition.time-interval}}: this is a new option. It indicates the minimal 
> time interval of the partitions. It's used to calculate the correct watermark 
> when a partition is finished. The watermark = partition-timestamp + 
> time-inteval.
> h2. How to support watermark for existing Hive tables
> We all know that we can't create a new table for an existing Hive table. So 
> we should support altering existing Hive table to add the watermark 
> inforamtion. 
> This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634. 
> Because watermark, computed column, table options are all encoded in Hive 
> table parameters, 
> so other systems (e.g. Hive MR, Spark) can still read this Hive table as 
> usual. 
> {code:sql}
> ALTER TABLE hive_table ADD (
>   WATERMARK FOR ts AS SOURCE_WATERMARK
> );
> ALTER TABLE hive_table SET (
>   'streaming-source.enable'='true',
>   'streaming-source.monitor-interval'='1s',
>   'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
>   'partition.time-interval'='1h'
> );
> {code}
> h2. Implementation Details
> 1. SplitEnumerator: monitors new partitions throught {{PartitionMonitor}}, 
> sorts partitions by partition name, adds new splits of new partitions to 
> {{SplitAssigner}}, and tags the last split of each partition.
> 2. SourceReader: request split to SplitEnumerator, when setup or read out a 
> split.
> 3. SplitEnumerator: get split from SplitAssigner, assigned it to the 
> requested reader. If the split is the last one of the partition, then 
> broadcast a watermark event to all the readers.
> 4. SourceReader receive split: start to read data of the assigned split
> 5. SourceReader recieve watermark: If there is assigned splits, output 
> received watermark when splits are read out. If no assigned splits, output 
> received watermark right now. 
> Note: the SplitAssigner should assign splits in FIFO order. 
> The above implementation doesn't require new interface or new method of 
> FLIP-27 source. All can be implemented in Hive/Filesystem connector module. 
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to