[ 
https://issues.apache.org/jira/browse/YARN-10178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455323#comment-17455323
 ] 

Andras Gyori commented on YARN-10178:
-------------------------------------

We have faced the same issue in a production cluster recently. I agree with 
[~epayne] that this should be resolved as soon as possible. My feedback on the 
patch:
 * As this is a subtle concurrency issue, I have not been able to reproduce it 
yet, but I was wondering whether we could avoid creating the snapshot 
altogether, by modifying the original comparator to acquire the necessary 
values immediately, thus hopefully eliminating the possibility of violating the 
sorting's requirements. This would look as the following:

{code:java}
float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p);
float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p);
float q1AbsUsedCapacity = q1.getQueueCapacities().getAbsoluteUsedCapacity(p);
float q2AbsUsedCapacity = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); 
float q1UsedCapacity = q1.getQueueCapacities().getUsedCapacity(p);
float q2UsedCapacity = q2.getQueueCapacities().getUsedCapacity(p); 
.....{code}
 

 * We should not use the Stream API because of older branches. I suggest 
rewriting getAssignmentIterator:
{code:java}
@Override
public Iterator<CSQueue> getAssignmentIterator(String partition) {
  // Since partitionToLookAt is a thread local variable, and every time we
  // copy and sort queues, so it's safe for multi-threading environment.
  PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);

  // Sort the snapshots instead of the queues directly, due to race conditions
  // See YARN-10178 for more information.
  List<QueueSnapshot> queueSnapshots = new ArrayList<>();
  for (CSQueue queue : queues) {
    queueSnapshots.add(new QueueSnapshot(queue));
  }
  queueSnapshots.sort(new PriorityQueueComparator());

  List<CSQueue> sortedQueues = new ArrayList<>();
  for (QueueSnapshot queueSnapshot : queueSnapshots) {
    sortedQueues.add(queueSnapshot.queue);
  }

  return sortedQueues.iterator();
} {code}

 * We do not need to keep the old logic
 * Measuring performance is a delicate procedure. Including it in a unit test 
is incredibly volatile (On my local machine I have not been able to pass the 
test for example) especially when naive time measurement is involved. Not sure 
if we can easily reproduce it, but I think in this case the no test is better 
than a potentially intermittent test.

> Global Scheduler async thread crash caused by 'Comparison method violates its 
> general contract'
> -----------------------------------------------------------------------------------------------
>
>                 Key: YARN-10178
>                 URL: https://issues.apache.org/jira/browse/YARN-10178
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: capacity scheduler
>    Affects Versions: 3.2.1
>            Reporter: tuyu
>            Assignee: Qi Zhu
>            Priority: Major
>         Attachments: YARN-10178.001.patch, YARN-10178.002.patch, 
> YARN-10178.003.patch, YARN-10178.004.patch, YARN-10178.005.patch
>
>
> Global Scheduler Async Thread crash stack
> {code:java}
> ERROR org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Received 
> RMFatalEvent of type CRITICAL_THREAD_CRASH, caused by a critical thread, 
> Thread-6066574, that exited unexpectedly: java.lang.IllegalArgumentException: 
> Comparison method violates its general contract!                              
>                                        at 
> java.util.TimSort.mergeHi(TimSort.java:899)
>         at java.util.TimSort.mergeAt(TimSort.java:516)
>         at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
>         at java.util.TimSort.sort(TimSort.java:254)
>         at java.util.Arrays.sort(Arrays.java:1512)
>         at java.util.ArrayList.sort(ArrayList.java:1462)
>         at java.util.Collections.sort(Collections.java:177)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy.getAssignmentIterator(PriorityUtilizationQueueOrderingPolicy.java:221)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.sortAndGetChildrenAllocationIterator(ParentQueue.java:777)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.assignContainersToChildQueues(ParentQueue.java:791)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue.assignContainers(ParentQueue.java:623)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateOrReserveNewContainers(CapacityScheduler.java:1635)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainerOnSingleNode(CapacityScheduler.java:1629)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainersToNode(CapacityScheduler.java:1732)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.allocateContainersToNode(CapacityScheduler.java:1481)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler.schedule(CapacityScheduler.java:569)
>         at 
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler$AsyncScheduleThread.run(CapacityScheduler.java:616)
> {code}
> JAVA 8 Arrays.sort default use timsort algo, and timsort has  few require 
> {code:java}
> 1.x.compareTo(y) != y.compareTo(x)
> 2.x>y,y>z --> x > z
> 3.x=y, x.compareTo(z) == y.compareTo(z)
> {code}
> if not Arrays paramters not satify this require,TimSort will throw 
> 'java.lang.IllegalArgumentException'
> look at PriorityUtilizationQueueOrderingPolicy.compare function,we will know 
> Capacity Scheduler use this these queue resource usage to compare
> {code:java}
> AbsoluteUsedCapacity
> UsedCapacity
> ConfiguredMinResource
> AbsoluteCapacity
> {code}
> In Capacity Scheduler Global Scheduler AsyncThread use 
> PriorityUtilizationQueueOrderingPolicy function to choose queue to assign 
> container,and construct a CSAssignment struct, and use 
> submitResourceCommitRequest function add CSAssignment to backlogs
> ResourceCommitterService  will tryCommit this CSAssignment,look tryCommit 
> function,there will update queue resource usage
> {code:java}
> public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
>     boolean updatePending) {
>   long commitStart = System.nanoTime();
>   ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
>       (ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
>  
>   ...
>   boolean isSuccess = false;
>   if (attemptId != null) {
>     FiCaSchedulerApp app = getApplicationAttempt(attemptId);
>     // Required sanity check for attemptId - when async-scheduling enabled,
>     // proposal might be outdated if AM failover just finished
>     // and proposal queue was not be consumed in time
>     if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
>       if (app.accept(cluster, request, updatePending)
>           && app.apply(cluster, request, updatePending)) { // apply this 
> resource
>         ...
>         }
>     }
>   }
>   return isSuccess;
> }
> }
> {code}
> {code:java}
> public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
>     FiCaSchedulerNode> request, boolean updatePending) {
> ...
>     if (!reReservation) {
>         getCSLeafQueue().apply(cluster, request); 
>     }
> ...
> }
> {code}
> LeafQueue.apply invok allocateResource
> {code:java}
> void allocateResource(Resource clusterResource,
>     Resource resource, String nodePartition) {
>   try {
>     writeLock.lock(); // only lock leaf queue lock
>     queueUsage.incUsed(nodePartition, resource);
>  
>     ++numContainers;
>  
>     CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
>         this, labelManager, nodePartition); // there will update queue 
> statistics
>   } finally {
>     writeLock.unlock();
>   }
> }
> {code}
> we found ResourceCommitterService will only lock leaf queue to update queue 
> statistics, but AsyncThread use sortAndGetChildrenAllocationIterator only 
> lock queue root queue lock
> {code:java}
> ParentQueue.java
> private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
>       String partition) {
>     try {
>       readLock.lock();
>       return queueOrderingPolicy.getAssignmentIterator(partition);
>     } finally {
>       readLock.unlock();
>     }
>   }
> {code}
> so if multi async thread compare queue usage statistics and 
> ResourceCommitterService apply leaf queue change statistics concurrent, will 
> break TimSort algo required, and cause thread crash



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to