YARN-3090. DeletionService can silently ignore deletion task failures. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4eb5f7fa Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4eb5f7fa Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4eb5f7fa Branch: refs/heads/YARN-2928 Commit: 4eb5f7fa32bab1b9ce3fb58eca51e2cd2e194cd5 Parents: e0ec071 Author: Jason Lowe <jl...@apache.org> Authored: Tue Feb 10 16:54:21 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Tue Feb 10 16:54:21 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../server/nodemanager/DeletionService.java | 40 +++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eb5f7fa/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fbeca6a..5a3a505 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -527,6 +527,9 @@ Release 2.7.0 - UNRELEASED YARN-2971. RM uses conf instead of token service address to renew timeline delegation tokens (jeagles) + YARN-3090. DeletionService can silently ignore deletion task failures + (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eb5f7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index e4025f5..4e00a1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -29,6 +29,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -113,13 +115,13 @@ public class DeletionService extends AbstractService { .setNameFormat("DeletionService #%d") .build(); if (conf != null) { - sched = new ScheduledThreadPoolExecutor( - conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), - tf); + sched = new DelServiceSchedThreadPoolExecutor( + conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { - sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, - tf); + sched = new DelServiceSchedThreadPoolExecutor( + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); @@ -155,6 +157,34 @@ public class DeletionService extends AbstractService { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } + private static class DelServiceSchedThreadPoolExecutor extends + ScheduledThreadPoolExecutor { + public DelServiceSchedThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + @Override + protected void afterExecute(Runnable task, Throwable exception) { + if (task instanceof FutureTask<?>) { + FutureTask<?> futureTask = (FutureTask<?>) task; + if (!futureTask.isCancelled()) { + try { + futureTask.get(); + } catch (ExecutionException ee) { + exception = ee.getCause(); + } catch (InterruptedException ie) { + exception = ie; + } + } + } + if (exception != null) { + LOG.error("Exception during execution of task in DeletionService", + exception); + } + } + } + public static class FileDeletionTask implements Runnable { public static final int INVALID_TASK_ID = -1; private int taskId;