[
https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15806592#comment-15806592
]
ASF GitHub Bot commented on TWILL-63:
-------------------------------------
Github user albertshau commented on a diff in the pull request:
https://github.com/apache/twill/pull/21#discussion_r95050388
--- Diff:
twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.twill.api.Configs;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.io.LocationCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for cleanup of {@link LocationCache}.
+ */
+final class LocationCacheCleaner extends AbstractIdleService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocationCacheCleaner.class);
+
+ private final Location cacheBaseLocation;
+ private final String sessionId;
+ private final long expiry;
+ private final long antiqueExpiry;
+ private final Predicate<Location> cleanupPredicate;
+ private final Set<PendingCleanup> pendingCleanups;
+ private ScheduledExecutorService scheduler;
+
+ LocationCacheCleaner(Configuration config, Location cacheBaseLocation,
+ String sessionId, Predicate<Location>
cleanupPredicate) {
+ this.cacheBaseLocation = cacheBaseLocation;
+ this.sessionId = sessionId;
+ this.expiry = config.getLong(Configs.Keys.LOCATION_CACHE_EXPIRY_MS,
+
Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
+ this.antiqueExpiry =
config.getLong(Configs.Keys.LOCATION_CACHE_ANTIQUE_EXPIRY_MS,
+
Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+ this.cleanupPredicate = cleanupPredicate;
+ this.pendingCleanups = new HashSet<>();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ scheduler =
Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("location-cache-cleanup"));
+ scheduler.execute(new Runnable() {
+ @Override
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+ cleanup(currentTime);
+
+ // By default, run the cleanup at half of the expiry
+ long scheduleDelay = expiry / 2;
+ for (PendingCleanup pendingCleanup : pendingCleanups) {
+ // If there is any pending cleanup that needs to be cleanup
early, schedule the run earlier.
+ if (pendingCleanup.getExpireTime() - currentTime <
scheduleDelay) {
+ scheduleDelay = pendingCleanup.getExpireTime() - currentTime;
+ }
+ }
+ scheduler.schedule(this, scheduleDelay, TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ scheduler.shutdownNow();
+ }
+
+ @VisibleForTesting
+ void forceCleanup(final long currentTime) {
+ Futures.getUnchecked(scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ cleanup(currentTime);
+ }
+ }));
+ }
+
+ /**
+ * Performs cleanup based on the given time.
+ */
+ private void cleanup(long currentTime) {
+ // First go through the pending cleanup list and remove those that can
be removed
+ Iterator<PendingCleanup> iterator = pendingCleanups.iterator();
+ while (iterator.hasNext()) {
+ PendingCleanup pendingCleanup = iterator.next();
+
+ // If rejected by the predicate, it means it is being used, hence
remove it from the pending cleanup list.
+ if (!cleanupPredicate.apply(pendingCleanup.getLocation())) {
+ iterator.remove();
+ } else {
+ try {
+ // If time is up for the pending entry, the location will be
deleted,
+ // hence can be removed from the pending cleanup list.
+ // Otherwise retain it for the next cycle.
+ if (pendingCleanup.deleteIfExpired(currentTime)) {
+ iterator.remove();
+ }
+ } catch (IOException e) {
+ // Log and retain the entry so that another attempt on deletion
will be made in next cleanup cycle
+ LOG.warn("Failed to delete {}", pendingCleanup.getLocation(), e);
+ }
+ }
+ }
+
+ // Then collects the next set of locations to be removed
+ try {
+ for (Location cacheDir : cacheBaseLocation.list()) {
+ try {
+ for (Location location : cacheDir.list()) {
+ if (cleanupPredicate.apply(location)) {
+ long expireTime = currentTime;
+ if (cacheDir.getName().equals(sessionId)) {
+ expireTime += expiry;
+ } else {
+ // If the cache entry is from different
YarnTwillRunnerService session, use the anti expiry time.
+ expireTime += antiqueExpiry;
+ }
+ // If the location is already pending for cleanup, this
won't update the expire time as
+ // the comparison of PendingCleanup is only by location.
+ pendingCleanups.add(new PendingCleanup(location,
expireTime));
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to list cache content from {}", cacheDir, e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to list cache directories from {}",
cacheBaseLocation, e);
+ }
+ }
+
+ /**
+ * Class for holding information about cache location that is pending to
be removed.
+ * The equality and hash code is only based on the location.
+ */
+ private static final class PendingCleanup {
+ private final Location location;
+ private final long expireTime;
+
+ PendingCleanup(Location location, long expireTime) {
+ this.location = location;
+ this.expireTime = expireTime;
+ }
+
+ Location getLocation() {
+ return location;
+ }
+
+ long getExpireTime() {
+ return expireTime;
+ }
+
+ boolean deleteIfExpired(long currentTime) throws IOException {
+ if (currentTime < expireTime) {
+ return false;
+ }
+ location.delete();
--- End diff --
shouldn't this return location.delete()?
> Speed up application launch time
> --------------------------------
>
> Key: TWILL-63
> URL: https://issues.apache.org/jira/browse/TWILL-63
> Project: Apache Twill
> Issue Type: Improvement
> Components: yarn
> Affects Versions: 0.2.0-incubating
> Reporter: Terence Yim
> Assignee: Terence Yim
> Fix For: 0.10.0
>
>
> Currently when launching an application, two new jars are always created
> locally, one for AM (appMaster.jar) and one for Container (container.jar) and
> copied to HDFS before submitting the application. The jar files could
> potentially be big and if it doesn't changed, it should require copying to
> HDFS again.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)