YARN-3090. DeletionService can silently ignore deletion task failures. 
Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4eb5f7fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4eb5f7fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4eb5f7fa

Branch: refs/heads/YARN-2928
Commit: 4eb5f7fa32bab1b9ce3fb58eca51e2cd2e194cd5
Parents: e0ec071
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Feb 10 16:54:21 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Feb 10 16:54:21 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../server/nodemanager/DeletionService.java     | 40 +++++++++++++++++---
 2 files changed, 38 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eb5f7fa/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fbeca6a..5a3a505 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -527,6 +527,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2971. RM uses conf instead of token service address to renew timeline
     delegation tokens (jeagles)
 
+    YARN-3090. DeletionService can silently ignore deletion task failures
+    (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4eb5f7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index e4025f5..4e00a1c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -113,13 +115,13 @@ public class DeletionService extends AbstractService {
       .setNameFormat("DeletionService #%d")
       .build();
     if (conf != null) {
-      sched = new ScheduledThreadPoolExecutor(
-          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, 
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
-          tf);
+      sched = new DelServiceSchedThreadPoolExecutor(
+          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
+          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
       debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
     } else {
-      sched = new 
ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
-          tf);
+      sched = new DelServiceSchedThreadPoolExecutor(
+          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
     }
     sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
@@ -155,6 +157,34 @@ public class DeletionService extends AbstractService {
     return getServiceState() == STATE.STOPPED && sched.isTerminated();
   }
 
+  private static class DelServiceSchedThreadPoolExecutor extends
+      ScheduledThreadPoolExecutor {
+    public DelServiceSchedThreadPoolExecutor(int corePoolSize,
+        ThreadFactory threadFactory) {
+      super(corePoolSize, threadFactory);
+    }
+
+    @Override
+    protected void afterExecute(Runnable task, Throwable exception) {
+      if (task instanceof FutureTask<?>) {
+        FutureTask<?> futureTask = (FutureTask<?>) task;
+        if (!futureTask.isCancelled()) {
+          try {
+            futureTask.get();
+          } catch (ExecutionException ee) {
+            exception = ee.getCause();
+          } catch (InterruptedException ie) {
+            exception = ie;
+          }
+        }
+      }
+      if (exception != null) {
+        LOG.error("Exception during execution of task in DeletionService",
+          exception);
+      }
+    }
+  }
+
   public static class FileDeletionTask implements Runnable {
     public static final int INVALID_TASK_ID = -1;
     private int taskId;

Reply via email to