[ 
https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15806592#comment-15806592
 ] 

ASF GitHub Bot commented on TWILL-63:
-------------------------------------

Github user albertshau commented on a diff in the pull request:

    https://github.com/apache/twill/pull/21#discussion_r95050388
  
    --- Diff: 
twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.twill.yarn;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Predicate;
    +import com.google.common.util.concurrent.AbstractIdleService;
    +import com.google.common.util.concurrent.Futures;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.twill.api.Configs;
    +import org.apache.twill.common.Threads;
    +import org.apache.twill.filesystem.Location;
    +import org.apache.twill.internal.io.LocationCache;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Responsible for cleanup of {@link LocationCache}.
    + */
    +final class LocationCacheCleaner extends AbstractIdleService {
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(LocationCacheCleaner.class);
    +
    +  private final Location cacheBaseLocation;
    +  private final String sessionId;
    +  private final long expiry;
    +  private final long antiqueExpiry;
    +  private final Predicate<Location> cleanupPredicate;
    +  private final Set<PendingCleanup> pendingCleanups;
    +  private ScheduledExecutorService scheduler;
    +
    +  LocationCacheCleaner(Configuration config, Location cacheBaseLocation,
    +                       String sessionId, Predicate<Location> 
cleanupPredicate) {
    +    this.cacheBaseLocation = cacheBaseLocation;
    +    this.sessionId = sessionId;
    +    this.expiry = config.getLong(Configs.Keys.LOCATION_CACHE_EXPIRY_MS,
    +                                 
Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
    +    this.antiqueExpiry = 
config.getLong(Configs.Keys.LOCATION_CACHE_ANTIQUE_EXPIRY_MS,
    +                                        
Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
    +    this.cleanupPredicate = cleanupPredicate;
    +    this.pendingCleanups = new HashSet<>();
    +  }
    +
    +  @Override
    +  protected void startUp() throws Exception {
    +    scheduler = 
Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("location-cache-cleanup"));
    +    scheduler.execute(new Runnable() {
    +      @Override
    +      public void run() {
    +        long currentTime = System.currentTimeMillis();
    +        cleanup(currentTime);
    +
    +        // By default, run the cleanup at half of the expiry
    +        long scheduleDelay = expiry / 2;
    +        for (PendingCleanup pendingCleanup : pendingCleanups) {
    +          // If there is any pending cleanup that needs to be cleanup 
early, schedule the run earlier.
    +          if (pendingCleanup.getExpireTime() - currentTime < 
scheduleDelay) {
    +            scheduleDelay = pendingCleanup.getExpireTime() - currentTime;
    +          }
    +        }
    +        scheduler.schedule(this, scheduleDelay, TimeUnit.MILLISECONDS);
    +      }
    +    });
    +  }
    +
    +  @Override
    +  protected void shutDown() throws Exception {
    +    scheduler.shutdownNow();
    +  }
    +
    +  @VisibleForTesting
    +  void forceCleanup(final long currentTime) {
    +    Futures.getUnchecked(scheduler.submit(new Runnable() {
    +      @Override
    +      public void run() {
    +        cleanup(currentTime);
    +      }
    +    }));
    +  }
    +
    +  /**
    +   * Performs cleanup based on the given time.
    +   */
    +  private void cleanup(long currentTime) {
    +    // First go through the pending cleanup list and remove those that can 
be removed
    +    Iterator<PendingCleanup> iterator = pendingCleanups.iterator();
    +    while (iterator.hasNext()) {
    +      PendingCleanup pendingCleanup = iterator.next();
    +
    +      // If rejected by the predicate, it means it is being used, hence 
remove it from the pending cleanup list.
    +      if (!cleanupPredicate.apply(pendingCleanup.getLocation())) {
    +        iterator.remove();
    +      } else {
    +        try {
    +          // If time is up for the pending entry, the location will be 
deleted,
    +          // hence can be removed from the pending cleanup list.
    +          // Otherwise retain it for the next cycle.
    +          if (pendingCleanup.deleteIfExpired(currentTime)) {
    +            iterator.remove();
    +          }
    +        } catch (IOException e) {
    +          // Log and retain the entry so that another attempt on deletion 
will be made in next cleanup cycle
    +          LOG.warn("Failed to delete {}", pendingCleanup.getLocation(), e);
    +        }
    +      }
    +    }
    +
    +    // Then collects the next set of locations to be removed
    +    try {
    +      for (Location cacheDir : cacheBaseLocation.list()) {
    +        try {
    +          for (Location location : cacheDir.list()) {
    +            if (cleanupPredicate.apply(location)) {
    +              long expireTime = currentTime;
    +              if (cacheDir.getName().equals(sessionId)) {
    +                expireTime += expiry;
    +              } else {
    +                // If the cache entry is from different 
YarnTwillRunnerService session, use the anti expiry time.
    +                expireTime += antiqueExpiry;
    +              }
    +              // If the location is already pending for cleanup, this 
won't update the expire time as
    +              // the comparison of PendingCleanup is only by location.
    +              pendingCleanups.add(new PendingCleanup(location, 
expireTime));
    +            }
    +          }
    +        } catch (IOException e) {
    +          LOG.warn("Failed to list cache content from {}", cacheDir, e);
    +        }
    +      }
    +    } catch (IOException e) {
    +      LOG.warn("Failed to list cache directories from {}", 
cacheBaseLocation, e);
    +    }
    +  }
    +
    +  /**
    +   * Class for holding information about cache location that is pending to 
be removed.
    +   * The equality and hash code is only based on the location.
    +   */
    +  private static final class PendingCleanup {
    +    private final Location location;
    +    private final long expireTime;
    +
    +    PendingCleanup(Location location, long expireTime) {
    +      this.location = location;
    +      this.expireTime = expireTime;
    +    }
    +
    +    Location getLocation() {
    +      return location;
    +    }
    +
    +    long getExpireTime() {
    +      return expireTime;
    +    }
    +
    +    boolean deleteIfExpired(long currentTime) throws IOException {
    +      if (currentTime < expireTime) {
    +        return false;
    +      }
    +      location.delete();
    --- End diff --
    
    shouldn't this return location.delete()?


> Speed up application launch time
> --------------------------------
>
>                 Key: TWILL-63
>                 URL: https://issues.apache.org/jira/browse/TWILL-63
>             Project: Apache Twill
>          Issue Type: Improvement
>          Components: yarn
>    Affects Versions: 0.2.0-incubating
>            Reporter: Terence Yim
>            Assignee: Terence Yim
>             Fix For: 0.10.0
>
>
> Currently when launching an application, two new jars are always created 
> locally, one for AM (appMaster.jar) and one for Container (container.jar) and 
> copied to HDFS before submitting the application. The jar files could 
> potentially be big and if it doesn't changed, it should require copying to 
> HDFS again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to