[ 
https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330008#comment-17330008
 ] 

Timo Walther commented on FLINK-22356:
--------------------------------------

I understand the reasoning behind it but this is also achievable by casting the 
TIMESTAMP to TIMESTAMP_LTZ under the hood. We could make it possible that 
WATERMARK is always TIMESTAMP_LTZ by inserting a CAST(watermark_expression AS 
TIMESTAMP_LTZ) in the watermark generator. In this case, we only need to worry 
about casting TIMESTAMP to TIMESTAMP_LTZ in window operators etc.

> Filesystem/Hive partition file is not committed when watermark is applied on 
> rowtime of TIMESTAMP_LTZ type
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22356
>                 URL: https://issues.apache.org/jira/browse/FLINK-22356
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> API
>            Reporter: Jark Wu
>            Assignee: Leonard Xu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.14.0
>
>
> {code:sql}
> set execution.checkpointing.interval = 10s;
> set table.local-time-zone = 'Asia/Shanghai';
> create table mysource (
>   ms bigint,
>   ts as to_timestamp_ltz(ms, 3),
>   watermark for ts as ts - interval '0.001' second
> ) with (
>   'connector' = 'socket',
>   'format' = 'json',
>   'hostname' = '127.0.0.1',
>   'port' = '9999'
> );
> CREATE TABLE fs_table2 (
>     ms bigint,
>   dt STRING,
>   `hour` STRING,
>   `mm` string
> ) PARTITIONED BY (dt, `hour`, `mm`) WITH (
>   'connector'='filesystem',
>   'path'='/Users/wuchong/Downloads/fs_table2',
>   'format'='csv',
>   'sink.partition-commit.delay'='1min',
>   'sink.partition-commit.policy.kind'='success-file',
>   'sink.rolling-policy.rollover-interval' = '30s',
>   'sink.rolling-policy.check-interval' = '30s',
>   'sink.partition-commit.trigger'='partition-time',
>   'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00'
> );
> insert into  fs_table2
> SELECT ms,
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM mysource;
> {code}
> Enther some data in socket:
> {code}
> > nc -lk 9999
> {"ms": 1618839600000}
> {"ms": 1618839600123}
> {"ms": 1618839600456}
> {"ms": 1618839600789}
> {"ms": 1618839660000}
> {"ms": 1618839660123}
> {"ms": 1618839660456}
> {"ms": 1618839660789}
> {"ms": 1618839720000}
> {"ms": 1618839780000}
> {"ms": 1618839840000}
> {"ms": 1618839900000}
> {"ms": 1618839960000}
> {"ms": 1618840020000}
> {code}
> However, all the files are not committed (not {{_SUCCESS}} file):
> {code}
> ➜  hour=21 tree
> .
> ├── mm=40
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0
> ├── mm=41
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1
> ├── mm=42
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2
> ├── mm=43
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3
> ├── mm=44
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4
> ├── mm=45
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5
> ├── mm=46
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6
> └── mm=47
>     └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7
> 8 directories, 8 files
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to