I finally found the time to dig a little more on this and found the real 
problem.
The culprit of the slow-down is this piece of code:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551

This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs 
from an instrumented call:
2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification 
- done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed 
partPath = s3a://....

This together with the default setup of the bucketing sink with 60 secs 
inactivity rollover 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195
 
means that with more than 10 parallel bucket on a slot by the time we finish 
creating the last bucket the first one became stale, so needs to be rotated 
generating a blocking situation.

We solved this by deleting the FS check mentioned above (now the file opening 
takes ~1.2sec) and set the default inactive threshold to 5 mins. With this 
changes we can easily handle more than 200 buckets per slot (once the job takes 
speed it will ingest on all the slots so postponing the inactive timeout)

-Enrico

Reply via email to