Hi My friends:
    I use FlieSystem in Flink SQL, and I found that my success file was 
submitted late, probably dozens of minutes late.
    Here I provide some information:
    1.Flink version is 1.11.1.
    2.Source DDL
       create table test (
          `timestamp bigint`,
     event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL'10'MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 
'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' 
= '10 min', 'sink.partition-commit.trigger' = 'partition-time', 
'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 
'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)


4.The interval for task submission checkpoint is 5 minutes, and the checkpoints 
are all successful.


I think that if my task is not delayed, then our success file will be submitted 
in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is 
greater than the current partition time + delay time, I can submit the success 
file.
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
   while (iter.hasNext()) {
      String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
      if (watermark > toMills(partTime) + commitDelay) {
         needCommit.add(partition);
iter.remove();
}
   }
return needCommit;
}
Best,
Forideal

Reply via email to