[ 
https://issues.apache.org/jira/browse/YARN-1889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandy Ryza updated YARN-1889:
-----------------------------

    Description: 
In fair scheduler, in each scheduling attempt, a full sort is
performed on List of AppSchedulable, which invokes Comparator.compare
method many times. Both FairShareComparator and DRFComparator call
AppSchedulable.getWeights, and AppSchedulable.getPriority.

A new ResourceWeights object is allocated on each call of getWeights,
and the same for getPriority. This introduces a lot of pressure to
GC because these methods are called very very frequently.

Below test case shows improvement on performance and GC behaviour. The results 
show that the GC pressure during processing NodeUpdate is recuded half by this 
patch.

The code to show the improvement: (Add it to TestFairScheduler.java)

{code}
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
  public void printGCStats() {
    long totalGarbageCollections = 0;
    long garbageCollectionTime = 0;

    for(GarbageCollectorMXBean gc :
      ManagementFactory.getGarbageCollectorMXBeans()) {
      long count = gc.getCollectionCount();
      if(count >= 0) {
        totalGarbageCollections += count;
      }

      long time = gc.getCollectionTime();
      if(time >= 0) {
        garbageCollectionTime += time;
      }
    }

    System.out.println("Total Garbage Collections: "
        + totalGarbageCollections);
    System.out.println("Total Garbage Collection Time (ms): "
        + garbageCollectionTime);
  }

  @Test
  public void testImpactOnGC() throws Exception {
    scheduler.reinitialize(conf, resourceManager.getRMContext());

    // Add nodes
    int numNode = 10000;

    for (int i = 0; i < numNode; ++i) {
        String host = String.format("192.1.%d.%d", i/256, i%256);
        RMNode node =
            MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), i, 
host);
        NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
        scheduler.handle(nodeEvent);
        assertEquals(1024 * 64 * (i+1), 
scheduler.getClusterCapacity().getMemory());
    }
    assertEquals(numNode, scheduler.getNumClusterNodes());
    assertEquals(1024 * 64 * numNode, 
scheduler.getClusterCapacity().getMemory());

    // add apps, each app has 100 containers.
    int minReqSize =
        FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
    int numApp = 8000;
    int priority = 1;

    for (int i = 1; i < numApp + 1; ++i) {
        ApplicationAttemptId attemptId = createAppAttemptId(i, 1);
        AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
                        attemptId.getApplicationId(), "queue1", "user1");
        scheduler.handle(appAddedEvent);
        AppAttemptAddedSchedulerEvent attemptAddedEvent =
            new AppAttemptAddedSchedulerEvent(attemptId, false);
        scheduler.handle(attemptAddedEvent);
        createSchedulingRequestExistingApplication(minReqSize * 2, 1, priority, 
attemptId);
    }
    scheduler.update();

    assertEquals(numApp, scheduler.getQueueManager().getLeafQueue("queue1", 
true)
        .getRunnableAppSchedulables().size());

    System.out.println("GC stats before NodeUpdate processing:");
    printGCStats();
    int hb_num = 5000;
    long start = System.nanoTime();
    for (int i = 0; i < hb_num; ++i) {
      String host = String.format("192.1.%d.%d", i/256, i%256);
      RMNode node =
          MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), 5000, 
host);
      NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node);
      scheduler.handle(nodeEvent);
    }
    long end = System.nanoTime();

    System.out.printf("processing time for a NodeUpdate in average: %d us\n",
                (end - start)/(hb_num * 1000));

    System.out.println("GC stats after NodeUpdate processing:");
    printGCStats();
  }

{code}


  was:
In fair scheduler, in each scheduling attempt, a full sort is
performed on List of AppSchedulable, which invokes Comparator.compare
method many times. Both FairShareComparator and DRFComparator call
AppSchedulable.getWeights, and AppSchedulable.getPriority.

A new ResourceWeights object is allocated on each call of getWeights,
and the same for getPriority. This introduces a lot of pressure to
GC because these methods are called very very frequently.

Below test case shows improvement on performance and GC behaviour. The results 
show that the GC pressure during processing NodeUpdate is recuded half by this 
patch.

The code to show the improvement: (Add it to TestFairScheduler.java)

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
  public void printGCStats() {
    long totalGarbageCollections = 0;
    long garbageCollectionTime = 0;

    for(GarbageCollectorMXBean gc :
      ManagementFactory.getGarbageCollectorMXBeans()) {
      long count = gc.getCollectionCount();
      if(count >= 0) {
        totalGarbageCollections += count;
      }

      long time = gc.getCollectionTime();
      if(time >= 0) {
        garbageCollectionTime += time;
      }
    }

    System.out.println("Total Garbage Collections: "
        + totalGarbageCollections);
    System.out.println("Total Garbage Collection Time (ms): "
        + garbageCollectionTime);
  }

  @Test
  public void testImpactOnGC() throws Exception {
    scheduler.reinitialize(conf, resourceManager.getRMContext());

    // Add nodes
    int numNode = 10000;

    for (int i = 0; i < numNode; ++i) {
        String host = String.format("192.1.%d.%d", i/256, i%256);
        RMNode node =
            MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), i, 
host);
        NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
        scheduler.handle(nodeEvent);
        assertEquals(1024 * 64 * (i+1), 
scheduler.getClusterCapacity().getMemory());
    }
    assertEquals(numNode, scheduler.getNumClusterNodes());
    assertEquals(1024 * 64 * numNode, 
scheduler.getClusterCapacity().getMemory());

    // add apps, each app has 100 containers.
    int minReqSize =
        FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
    int numApp = 8000;
    int priority = 1;

    for (int i = 1; i < numApp + 1; ++i) {
        ApplicationAttemptId attemptId = createAppAttemptId(i, 1);
        AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
                        attemptId.getApplicationId(), "queue1", "user1");
        scheduler.handle(appAddedEvent);
        AppAttemptAddedSchedulerEvent attemptAddedEvent =
            new AppAttemptAddedSchedulerEvent(attemptId, false);
        scheduler.handle(attemptAddedEvent);
        createSchedulingRequestExistingApplication(minReqSize * 2, 1, priority, 
attemptId);
    }
    scheduler.update();

    assertEquals(numApp, scheduler.getQueueManager().getLeafQueue("queue1", 
true)
        .getRunnableAppSchedulables().size());

    System.out.println("GC stats before NodeUpdate processing:");
    printGCStats();
    int hb_num = 5000;
    long start = System.nanoTime();
    for (int i = 0; i < hb_num; ++i) {
      String host = String.format("192.1.%d.%d", i/256, i%256);
      RMNode node =
          MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), 5000, 
host);
      NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node);
      scheduler.handle(nodeEvent);
    }
    long end = System.nanoTime();

    System.out.printf("processing time for a NodeUpdate in average: %d us\n",
                (end - start)/(hb_num * 1000));

    System.out.println("GC stats after NodeUpdate processing:");
    printGCStats();
  }




> avoid creating new objects on each fair scheduler call to AppSchedulable 
> comparator
> -----------------------------------------------------------------------------------
>
>                 Key: YARN-1889
>                 URL: https://issues.apache.org/jira/browse/YARN-1889
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: scheduler
>            Reporter: Hong Zhiguo
>            Assignee: Hong Zhiguo
>            Priority: Minor
>              Labels: reviewed
>         Attachments: YARN-1889.patch, YARN-1889.patch
>
>
> In fair scheduler, in each scheduling attempt, a full sort is
> performed on List of AppSchedulable, which invokes Comparator.compare
> method many times. Both FairShareComparator and DRFComparator call
> AppSchedulable.getWeights, and AppSchedulable.getPriority.
> A new ResourceWeights object is allocated on each call of getWeights,
> and the same for getPriority. This introduces a lot of pressure to
> GC because these methods are called very very frequently.
> Below test case shows improvement on performance and GC behaviour. The 
> results show that the GC pressure during processing NodeUpdate is recuded 
> half by this patch.
> The code to show the improvement: (Add it to TestFairScheduler.java)
> {code}
> import java.lang.management.GarbageCollectorMXBean;
> import java.lang.management.ManagementFactory;
>   public void printGCStats() {
>     long totalGarbageCollections = 0;
>     long garbageCollectionTime = 0;
>     for(GarbageCollectorMXBean gc :
>       ManagementFactory.getGarbageCollectorMXBeans()) {
>       long count = gc.getCollectionCount();
>       if(count >= 0) {
>         totalGarbageCollections += count;
>       }
>       long time = gc.getCollectionTime();
>       if(time >= 0) {
>         garbageCollectionTime += time;
>       }
>     }
>     System.out.println("Total Garbage Collections: "
>         + totalGarbageCollections);
>     System.out.println("Total Garbage Collection Time (ms): "
>         + garbageCollectionTime);
>   }
>   @Test
>   public void testImpactOnGC() throws Exception {
>     scheduler.reinitialize(conf, resourceManager.getRMContext());
>     // Add nodes
>     int numNode = 10000;
>     for (int i = 0; i < numNode; ++i) {
>         String host = String.format("192.1.%d.%d", i/256, i%256);
>         RMNode node =
>             MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), i, 
> host);
>         NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
>         scheduler.handle(nodeEvent);
>         assertEquals(1024 * 64 * (i+1), 
> scheduler.getClusterCapacity().getMemory());
>     }
>     assertEquals(numNode, scheduler.getNumClusterNodes());
>     assertEquals(1024 * 64 * numNode, 
> scheduler.getClusterCapacity().getMemory());
>     // add apps, each app has 100 containers.
>     int minReqSize =
>         
> FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
>     int numApp = 8000;
>     int priority = 1;
>     for (int i = 1; i < numApp + 1; ++i) {
>         ApplicationAttemptId attemptId = createAppAttemptId(i, 1);
>         AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
>                         attemptId.getApplicationId(), "queue1", "user1");
>         scheduler.handle(appAddedEvent);
>         AppAttemptAddedSchedulerEvent attemptAddedEvent =
>             new AppAttemptAddedSchedulerEvent(attemptId, false);
>         scheduler.handle(attemptAddedEvent);
>         createSchedulingRequestExistingApplication(minReqSize * 2, 1, 
> priority, attemptId);
>     }
>     scheduler.update();
>     assertEquals(numApp, scheduler.getQueueManager().getLeafQueue("queue1", 
> true)
>         .getRunnableAppSchedulables().size());
>     System.out.println("GC stats before NodeUpdate processing:");
>     printGCStats();
>     int hb_num = 5000;
>     long start = System.nanoTime();
>     for (int i = 0; i < hb_num; ++i) {
>       String host = String.format("192.1.%d.%d", i/256, i%256);
>       RMNode node =
>           MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), 5000, 
> host);
>       NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node);
>       scheduler.handle(nodeEvent);
>     }
>     long end = System.nanoTime();
>     System.out.printf("processing time for a NodeUpdate in average: %d us\n",
>                 (end - start)/(hb_num * 1000));
>     System.out.println("GC stats after NodeUpdate processing:");
>     printGCStats();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to