sink.partition-commit.trigger 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-trigger>
 process-time    String  Trigger type for partition commit: 'process-time': 
based on the time of the machine, it neither requires partition time extraction 
nor watermark generation. Commit partition once the 'current system time' 
passes 'partition creation system time' plus 'delay'. 'partition-time': based 
on the time that extracted from partition values, it requires watermark 
generation. Commit partition once the 'watermark' passes 'time extracted from 
partition values' plus 'delay'.
sink.partition-commit.delay 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-delay>
     0 s     Duration        The partition will not commit until the delay 
time. If it is a daily partition, should be '1 d', if it is a hourly partition, 
should be '1 h'.
这两个参数都没有设置,都是默认值

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang <kandy1...@163.com> 写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '300000'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'yyyy-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 

回复