yikf commented on pull request #3042:
URL: https://github.com/apache/hadoop/pull/3042#issuecomment-848412623


   > Thanks for including me, @steveloughran
   > 
   > Let me first understand the problem: unless new reference object is 
available in the queue (Java code calling `enqueue()`), those existing 
references will not be cleaned up forever. That is because when calling 
`remove()`, the `StatisticsDataReferenceCleaner` thread will wait forever in 
case there is no notify/notifyAll events upon the internal queue lock.
   > 
   > To fix the problem, here we propose to call `remove(timeout)` version in 
the `StatisticsDataReferenceCleaner` thread. Its timeout value will be honored 
when waiting for internal queue lock. That will give the cleaner thread an 
opportunity to dequeue periodically - instead of getting blocked forever if no 
notify event happens to the internal queue lock. Eventually, all reference 
object in the queue will get cleaned up by cleaner with this mechanism.
   > 
   > That makes sense to me, if I understand the problem and solution 
correctly. Let me know @yikf
   > 
   > As to implementation, I agree 100s might be too stingy to this cleanup (we 
remove one every time, so essentially 100s to cleanup one at best). I'm also 
wondering if 100ms is too generous here. How many threads do we target here? To 
my best knowledge, 1K is pretty large and close to the upper limit. To cleanup 
everything eventually AND without any help of enqueue events, it takes 10min to 
cleanup everything, if the timeout is 600ms. Is this a reasonable value?
   > 
   > I see you refer to Spark settings, but I assume that is targeting much 
more references including RDD, shuffle, and broadcast state etc?
   
   Thanks for review, In fact, If there are reference objects in 
`ReferenceQueue`, `ReferenceQueue.remove` will be returned directly without 
`wait`.
   
   reference JDK Code snippet
   ```
   public Reference<? extends T> remove(long timeout)
           throws IllegalArgumentException, InterruptedException
       {
           if (timeout < 0) {
               throw new IllegalArgumentException("Negative timeout value");
           }
           synchronized (lock) {
               Reference<? extends T> r = reallyPoll();
               if (r != null) return r;
               long start = (timeout == 0) ? 0 : System.nanoTime();
               for (;;) {
                   lock.wait(timeout);
   …
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to