zentol opened a new pull request #12264:
URL: https://github.com/apache/flink/pull/12264


   With this PR the local release of partitions in the 
`NettyShuffleEnvironment` is executed asynchronously. This primarily includes 
the deletion of files. We have seen instances where this operation was taking a 
long time, blocking the TaskExecutor main thread.
   
   For this we are re-using the existing `ioExecutor` from the `TaskExecutor` 
and also pass it into the `NettyShuffleEnvironment`. This executor was so far 
only used for handling log file requests.
   The number of threads of this executor was increased from 1 -> 2.
   Given that at most 1 thread should be busy handling log file requests (?), 
we should at least 1 thread processing the partition releases, which should be 
sufficient as we so far only used 1 thread as well (a busy one at that).
   We still want these partitions to be cleaned up in a timely fashion; not for 
correctness, but to reduce disk usage.
   There is a new (hidden) option for increasing this thread count, but ideally 
users should never have to use it.
   
   There are ultimately 3 places where we could introduce the asynchronous 
behavior; in the TaskExecutor, PartitionTracker or ShuffleEnvironment.
   
   Doing this in the TaskExecutor would imply doing all operations on the 
PartitionTracker with the ioExecutor, since the tracker requires 
single-threaded access. This would add complexity where we want it the least, 
and could easily lead to bugs since we start switching back and forth between 
executors.
   
   The PartitionTracker is a good candidate; it can selectively use the 
executor for the release call into the ShuffleEnvironment, and would guard the 
TaskExecutor against such blocking calls regardless of ShuffleEnvironment 
implementations. So far the tracker was not aware of any asynchronous stuff 
however, and there may be value in keeping it that way.
   
   For this PR I opted for the ShuffleEnvironment. It seems reasonable that the 
local release should be a non-blocking operation; any ShuffleEnvironment that 
communicates to the outside will necessarily have to do some non-blocking 
operations, and being provided an Executor through the 
ShuffleEnvironmentContext may make this easier, although it has the downside 
that the ShuffleEnvironment may over-use it and potentially deny other other 
usages of the executor. On the flip-side, this implies that we only use the 
executor when necessary; for a ShuffleEnvironment that implements the local 
release in a non-blocking fashion the safeguard in the PartitionTracker would 
just be a waste of resources.
   
   One thing left to do is adding a test to ensure a blocking partition release 
is not blocking the TaskExecutor main thread.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to