Hi ,kandy 我没有基于partition time 提交分区,我是基于默认的process time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 2020年11月12日 下午12:46,kandy.wang <kandy1...@163.com> 写道: > > hi: > 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit > delay 时机触发分区提交,得看你的sink.partition-commit.delay > 设置的多久,如果超过之后,应当默认是会丢弃的吧。 > > > https://cloud.tencent.com/developer/article/1707182 > > 这个连接可以看一下 > > > > > > > > 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道: >> Hi,all >> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 >> 现在有这样的场景: >> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, >> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? >> 有大佬知道吗,有实际验证过吗 >> 感谢 >> >> 附上简单sql: >> CREATE TABLE kafka ( >> a STRING, >> b STRING, >> c BIGINT, >> process_time BIGINT, >> e STRING, >> f STRING, >> g STRING, >> h INT, >> i STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic', >> 'properties.bootstrap.servers' = 'x', >> 'properties.group.id' = 'test-1', >> 'scan.startup.mode' = 'latest-offset', >> 'format' = 'json', >> 'properties.flink.partition-discovery.interval-millis' = '300000' >> ); >> >> CREATE TABLE filesystem ( >> `day` STRING, >> `hour` STRING, >> a STRING, >> b STRING, >> c BIGINT, >> d BIGINT, >> e STRING, >> f STRING, >> g STRING, >> h INT, >> i STRING >> ) PARTITIONED BY (`day`, `hour`) WITH ( >> 'connector' = 'filesystem', >> 'format' = 'parquet', >> 'path' = 'hdfs://xx', >> 'parquet.compression'='SNAPPY', >> 'sink.partition-commit.policy.kind' = 'success-file' >> ); >> >> insert into filesystem >> select >> from_unixtime(process_time,'yyyy-MM-dd') as `day`, >> from_unixtime(process_time,'HH') as `hour`, >> a, >> b, >> c, >> d, >> e, >> f, >> g, >> h, >> i >> from kafka; >> >> >> >> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger