[ https://issues.apache.org/jira/browse/FLINK-21871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao updated FLINK-21871: ---------------------------- Fix Version/s: 1.16.0 > Support watermark for Hive and Filesystem streaming source > ---------------------------------------------------------- > > Key: FLINK-21871 > URL: https://issues.apache.org/jira/browse/FLINK-21871 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem, Connectors / Hive, Table SQL / > API > Reporter: Jark Wu > Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.15.0, 1.16.0 > > > Hive and Filesystem already support streaming source. However, they doesn't > support watermark on the source. That means users can't leverage the > streaming source to perform the Flink powerful streaming analysis, e.g. > window aggregate, interval join, and so on. > In order to make more Hive users can leverage Flink to perform streaming > analysis, and also cooperate with the new optimized window-TVF operations > (FLIP-145), we need to support watermark for Hive and Filesystem. > h2. How to emit watermark in Hive and Filesystem > Factual data in Hive are usually partitioned by date time, e.g. > {{pt_day=2021-03-19, pt_hour=10}}. In this case, when the data of partition > {{pt_day=2021-03-19, pt_hour=10}} are emitted, we should be able to know all > the data before {{2021-03-19 11:00:00}} have been arrived, so we can emit a > watermark value of {{2021-03-19 11:00:00}}. We call this partition watermark. > The partition watermark is much better than record watermark (extract > watermark from record, e.g. {{ts - INTERVAL '1' MINUTE}}). Because in above > example, if we are using partition watermark, the window of [10:00, 11:00) > will be triggered when pt_hour=10 is finished. However, if we are using > record watermark, the window of [10:00, 11:00) will be triggered when > pt_hour=11 is arrived, that will make the pipeline have one more partition > dely. > Therefore, we firstly focus on support partition watermark for Hive and > Filesystem. > h2. Example > In order to support such watermarks, we propose using the following DDL to > define a Hive table with watermark defined: > {code:sql} > -- using hive dialect > CREATE TABLE hive_table ( > x int, > y string, > z int, > ts timestamp, > WATERMARK FOR ts AS SOURCE_WATERMARK > ) PARTITIONED BY (pt_day string, pt_hour string) > TBLPROPERTIES ( > 'streaming-source.enable'='true', > 'streaming-source.monitor-interval'='1s', > 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00', > 'partition.time-interval'='1h' > ); > -- window aggregate on the hive table > SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z) > FROM TABLE( > TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR)) > GROUP BY window_start, window_end; > {code} > For filesystem connector, the DDL can be: > {code:sql} > CREATE TABLE fs_table ( > x int, > y string, > z int, > ts TIMESTAMP(3), > pt_day string, > pt_hour string, > WATERMARK FOR ts AS SOURCE_WATERMARK > ) PARTITIONED BY (pt_day, pt_hour) > WITH ( > 'connector' = 'filesystem', > 'path' = '/path/to/file', > 'format' = 'parquet', > 'streaming-source.enable'='true', > 'streaming-source.monitor-interval'='1s', > 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00', > 'partition.time-interval'='1h' > ); > {code} > I will explain the new function/configuration. > h2. SOURCE_WATERMARK built-in function > FLIP-66[1] proposed {{SYSTEM_WATERMARK}} function for watermarks preserved in > underlying source system. > However, the SYSTEM prefix sounds like a Flink system generated value, but > actually, this is a SOURCE system generated value. > So I propose to use {{SOURCE_WATERMARK}} intead, this also keeps the concept > align with the API of > {{org.apache.flink.table.descriptors.Rowtime#watermarksFromSource}}. > h2. Table Options for Watermark > - {{partition.time-extractor.timestamp-pattern}}: this option already exists. > This is used to extract/convert partition value to a timestamp value. > - {{partition.time-interval}}: this is a new option. It indicates the minimal > time interval of the partitions. It's used to calculate the correct watermark > when a partition is finished. The watermark = partition-timestamp + > time-inteval. > h2. How to support watermark for existing Hive tables > We all know that we can't create a new table for an existing Hive table. So > we should support altering existing Hive table to add the watermark > inforamtion. > This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634. > Because watermark, computed column, table options are all encoded in Hive > table parameters, > so other systems (e.g. Hive MR, Spark) can still read this Hive table as > usual. > {code:sql} > ALTER TABLE hive_table ADD ( > WATERMARK FOR ts AS SOURCE_WATERMARK > ); > ALTER TABLE hive_table SET ( > 'streaming-source.enable'='true', > 'streaming-source.monitor-interval'='1s', > 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00', > 'partition.time-interval'='1h' > ); > {code} > h2. Implementation Details > 1. SplitEnumerator: monitors new partitions throught {{PartitionMonitor}}, > sorts partitions by partition name, adds new splits of new partitions to > {{SplitAssigner}}, and tags the last split of each partition. > 2. SourceReader: request split to SplitEnumerator, when setup or read out a > split. > 3. SplitEnumerator: get split from SplitAssigner, assigned it to the > requested reader. If the split is the last one of the partition, then > broadcast a watermark event to all the readers. > 4. SourceReader receive split: start to read data of the assigned split > 5. SourceReader recieve watermark: If there is assigned splits, output > received watermark when splits are read out. If no assigned splits, output > received watermark right now. > Note: the SplitAssigner should assign splits in FIFO order. > The above implementation doesn't require new interface or new method of > FLIP-27 source. All can be implemented in Hive/Filesystem connector module. > [1]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL -- This message was sent by Atlassian Jira (v8.20.1#820001)