zentol opened a new pull request #12264: URL: https://github.com/apache/flink/pull/12264
With this PR the local release of partitions in the `NettyShuffleEnvironment` is executed asynchronously. This primarily includes the deletion of files. We have seen instances where this operation was taking a long time, blocking the TaskExecutor main thread. For this we are re-using the existing `ioExecutor` from the `TaskExecutor` and also pass it into the `NettyShuffleEnvironment`. This executor was so far only used for handling log file requests. The number of threads of this executor was increased from 1 -> 2. Given that at most 1 thread should be busy handling log file requests (?), we should at least 1 thread processing the partition releases, which should be sufficient as we so far only used 1 thread as well (a busy one at that). We still want these partitions to be cleaned up in a timely fashion; not for correctness, but to reduce disk usage. There is a new (hidden) option for increasing this thread count, but ideally users should never have to use it. There are ultimately 3 places where we could introduce the asynchronous behavior; in the TaskExecutor, PartitionTracker or ShuffleEnvironment. Doing this in the TaskExecutor would imply doing all operations on the PartitionTracker with the ioExecutor, since the tracker requires single-threaded access. This would add complexity where we want it the least, and could easily lead to bugs since we start switching back and forth between executors. The PartitionTracker is a good candidate; it can selectively use the executor for the release call into the ShuffleEnvironment, and would guard the TaskExecutor against such blocking calls regardless of ShuffleEnvironment implementations. So far the tracker was not aware of any asynchronous stuff however, and there may be value in keeping it that way. For this PR I opted for the ShuffleEnvironment. It seems reasonable that the local release should be a non-blocking operation; any ShuffleEnvironment that communicates to the outside will necessarily have to do some non-blocking operations, and being provided an Executor through the ShuffleEnvironmentContext may make this easier, although it has the downside that the ShuffleEnvironment may over-use it and potentially deny other other usages of the executor. On the flip-side, this implies that we only use the executor when necessary; for a ShuffleEnvironment that implements the local release in a non-blocking fashion the safeguard in the PartitionTracker would just be a waste of resources. One thing left to do is adding a test to ensure a blocking partition release is not blocking the TaskExecutor main thread. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org