[ https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327066#comment-17327066 ]
Jark Wu commented on FLINK-22356: --------------------------------- > Can't we fix the issue on the Hive side only? The sink factory can simply > access the resolved schema and check the watermark type and pass it to the > sink, no? We can't. Because the watermark information is on source table, not sink table, so Hive sink factory can't access to the watermark information. > Btw isn't any usage of > org.apache.flink.table.filesystem.DefaultPartTimeExtractor#toMills invalid, > not only for PartitionTimeCommitter. The another usage of {{#toMillis}} is used for comparing ordering of partitions, so it's used validly. > Filesystem/Hive partition file is not committed when watermark is applied on > rowtime of TIMESTAMP_LTZ type > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-22356 > URL: https://issues.apache.org/jira/browse/FLINK-22356 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Connectors / Hive, Table SQL / > API > Reporter: Jark Wu > Assignee: Leonard Xu > Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0, 1.14.0 > > > {code:sql} > set execution.checkpointing.interval = 10s; > set table.local-time-zone = 'Asia/Shanghai'; > create table mysource ( > ms bigint, > ts as to_timestamp_ltz(ms, 3), > watermark for ts as ts - interval '0.001' second > ) with ( > 'connector' = 'socket', > 'format' = 'json', > 'hostname' = '127.0.0.1', > 'port' = '9999' > ); > CREATE TABLE fs_table2 ( > ms bigint, > dt STRING, > `hour` STRING, > `mm` string > ) PARTITIONED BY (dt, `hour`, `mm`) WITH ( > 'connector'='filesystem', > 'path'='/Users/wuchong/Downloads/fs_table2', > 'format'='csv', > 'sink.partition-commit.delay'='1min', > 'sink.partition-commit.policy.kind'='success-file', > 'sink.rolling-policy.rollover-interval' = '30s', > 'sink.rolling-policy.check-interval' = '30s', > 'sink.partition-commit.trigger'='partition-time', > 'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00' > ); > insert into fs_table2 > SELECT ms, > DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM mysource; > {code} > Enther some data in socket: > {code} > > nc -lk 9999 > {"ms": 1618839600000} > {"ms": 1618839600123} > {"ms": 1618839600456} > {"ms": 1618839600789} > {"ms": 1618839660000} > {"ms": 1618839660123} > {"ms": 1618839660456} > {"ms": 1618839660789} > {"ms": 1618839720000} > {"ms": 1618839780000} > {"ms": 1618839840000} > {"ms": 1618839900000} > {"ms": 1618839960000} > {"ms": 1618840020000} > {code} > However, all the files are not committed (not {{_SUCCESS}} file): > {code} > ➜ hour=21 tree > . > ├── mm=40 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0 > ├── mm=41 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1 > ├── mm=42 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2 > ├── mm=43 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3 > ├── mm=44 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4 > ├── mm=45 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5 > ├── mm=46 > │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6 > └── mm=47 > └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7 > 8 directories, 8 files > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)