YARN-3074. Nodemanager dies when localizer runner tries to write to a full disk. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b379972a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b379972a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b379972a Branch: refs/heads/YARN-2928 Commit: b379972ab39551d4b57436a54c0098a63742c7e1 Parents: b94c111 Author: Jason Lowe <jl...@apache.org> Authored: Wed Feb 11 16:33:43 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Feb 11 16:33:43 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../localizer/ResourceLocalizationService.java | 19 +++-- .../TestResourceLocalizationService.java | 82 ++++++++++++++++++++ 3 files changed, 98 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a1c3407..ba5490c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -546,6 +546,9 @@ Release 2.7.0 - UNRELEASED YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl. (Chengbing Liu via junping_du) + YARN-3074. Nodemanager dies when localizer runner tries to write to a full + disk (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 8c84132..dd50ead 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -1063,6 +1064,7 @@ public class ResourceLocalizationService extends CompositeService @SuppressWarnings("unchecked") // dispatcher not typed public void run() { Path nmPrivateCTokensPath = null; + Throwable exception = null; try { // Get nmPrivateDir nmPrivateCTokensPath = @@ -1090,14 +1092,19 @@ public class ResourceLocalizationService extends CompositeService + dirsHandler.getDisksHealthReport(false)); } // TODO handle ExitCodeException separately? + } catch (FSError fe) { + exception = fe; } catch (Exception e) { - LOG.info("Localizer failed", e); - // 3) on error, report failure to Container and signal ABORT - // 3.1) notify resource of failed localization - ContainerId cId = context.getContainerId(); - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent(cId, null, e.getMessage())); + exception = e; } finally { + if (exception != null) { + LOG.info("Localizer failed", exception); + // On error, report failure to Container and signal ABORT + // Notify resource of failed localization + ContainerId cId = context.getContainerId(); + dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent( + cId, null, exception.getMessage())); + } for (LocalizerResourceRequestEvent event : scheduled.values()) { event.getResource().unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 30af5a4..d3c3521 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; @@ -715,6 +717,86 @@ public class TestResourceLocalizationService { stateStore.close(); } } + + + @Test( timeout = 10000) + @SuppressWarnings("unchecked") // mocked generics + public void testLocalizerRunnerException() throws Exception { + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler<ContainerEvent> containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler); + dirsHandlerSpy.init(conf); + + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); + delService.init(new Configuration()); + delService.start(); + + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandlerSpy, nmContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + try { + spyService.init(conf); + spyService.start(); + + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn("user0"); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + final Container c = getMockContainer(appId, 42, "user0"); + final LocalResource resource1 = getPrivateMockedResource(r); + System.out.println("Here 4"); + + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = + new HashMap<LocalResourceVisibility, + Collection<LocalResourceRequest>>(); + List<LocalResourceRequest> privateResourceList = + new ArrayList<LocalResourceRequest>(); + privateResourceList.add(req1); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + + final Constructor<?>[] constructors = + FSError.class.getDeclaredConstructors(); + constructors[0].setAccessible(true); + FSError fsError = + (FSError) constructors[0].newInstance(new IOException("Disk Error")); + + Mockito + .doThrow(fsError) + .when(dirsHandlerSpy) + .getLocalPathForWrite(isA(String.class)); + spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); + Thread.sleep(1000); + dispatcher.await(); + // Verify if ContainerResourceFailedEvent is invoked on FSError + verify(containerBus).handle(isA(ContainerResourceFailedEvent.class)); + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } @Test( timeout = 10000) @SuppressWarnings("unchecked") // mocked generics