YARN-3074. Nodemanager dies when localizer runner tries to write to a full 
disk. Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b379972a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b379972a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b379972a

Branch: refs/heads/YARN-2928
Commit: b379972ab39551d4b57436a54c0098a63742c7e1
Parents: b94c111
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Feb 11 16:33:43 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Feb 11 16:33:43 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../localizer/ResourceLocalizationService.java  | 19 +++--
 .../TestResourceLocalizationService.java        | 82 ++++++++++++++++++++
 3 files changed, 98 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a1c3407..ba5490c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -546,6 +546,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl. 
     (Chengbing Liu via junping_du)
 
+    YARN-3074. Nodemanager dies when localizer runner tries to write to a full
+    disk (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 8c84132..dd50ead 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -1063,6 +1064,7 @@ public class ResourceLocalizationService extends 
CompositeService
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       Path nmPrivateCTokensPath = null;
+      Throwable exception = null;
       try {
         // Get nmPrivateDir
         nmPrivateCTokensPath =
@@ -1090,14 +1092,19 @@ public class ResourceLocalizationService extends 
CompositeService
               + dirsHandler.getDisksHealthReport(false));
         }
       // TODO handle ExitCodeException separately?
+      } catch (FSError fe) {
+        exception = fe;
       } catch (Exception e) {
-        LOG.info("Localizer failed", e);
-        // 3) on error, report failure to Container and signal ABORT
-        // 3.1) notify resource of failed localization
-        ContainerId cId = context.getContainerId();
-        dispatcher.getEventHandler().handle(
-            new ContainerResourceFailedEvent(cId, null, e.getMessage()));
+        exception = e;
       } finally {
+        if (exception != null) {
+          LOG.info("Localizer failed", exception);
+          // On error, report failure to Container and signal ABORT
+          // Notify resource of failed localization
+          ContainerId cId = context.getContainerId();
+          dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
+              cId, null, exception.getMessage()));
+        }
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
           event.getResource().unlock();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 30af5a4..d3c3521 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -715,6 +717,86 @@ public class TestResourceLocalizationService {
       stateStore.close();
     }
   }
+  
+
+  @Test( timeout = 10000)
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testLocalizerRunnerException() throws Exception {
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
+    dirsHandlerSpy.init(conf);
+
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
+    delService.init(new Configuration());
+    delService.start();
+
+    ResourceLocalizationService rawService =
+        new ResourceLocalizationService(dispatcher, exec, delService,
+        dirsHandlerSpy, nmContext);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      // init application
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      when(app.getUser()).thenReturn("user0");
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      final Container c = getMockContainer(appId, 42, "user0");
+      final LocalResource resource1 = getPrivateMockedResource(r);
+      System.out.println("Here 4");
+      
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+        new HashMap<LocalResourceVisibility, 
+                    Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList.add(req1);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+
+      final Constructor<?>[] constructors =
+          FSError.class.getDeclaredConstructors();
+      constructors[0].setAccessible(true);
+      FSError fsError =
+          (FSError) constructors[0].newInstance(new IOException("Disk Error"));
+
+      Mockito
+        .doThrow(fsError)
+        .when(dirsHandlerSpy)
+        .getLocalPathForWrite(isA(String.class));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
+      Thread.sleep(1000);
+      dispatcher.await();
+      // Verify if ContainerResourceFailedEvent is invoked on FSError
+      verify(containerBus).handle(isA(ContainerResourceFailedEvent.class));
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
 
   @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics

Reply via email to