Author: bobby
Date: Wed Feb 27 15:30:10 2013
New Revision: 1450807

URL: http://svn.apache.org/r1450807
Log:
YARN-426. Failure to download a public resource prevents further downloads 
(Jason Lowe via bobby)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1450807&r1=1450806&r2=1450807&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Feb 27 15:30:10 2013
@@ -343,6 +343,9 @@ Release 0.23.7 - UNRELEASED
     YARN-400. RM can return null application resource usage report leading to 
     NPE in client (Jason Lowe via tgraves)
 
+    YARN-426. Failure to download a public resource prevents further downloads
+    (Jason Lowe via bobby)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1450807&r1=1450806&r2=1450807&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 Wed Feb 27 15:30:10 2013
@@ -659,25 +659,23 @@ public class ResourceLocalizationService
                   new ContainerResourceFailedEvent(
                     assoc.getContext().getContainerId(),
                     assoc.getResource().getRequest(), e.getCause()));
+              List<LocalizerResourceRequestEvent> reqs;
               synchronized (attempts) {
                 LocalResourceRequest req = assoc.getResource().getRequest();
-                List<LocalizerResourceRequestEvent> reqs = attempts.get(req);
+                reqs = attempts.get(req);
                 if (null == reqs) {
                   LOG.error("Missing pending list for " + req);
                   return;
                 }
-                if (reqs.isEmpty()) {
-                  attempts.remove(req);
-                }
-                /* 
-                 * Do not retry for now. Once failed is failed!
-                 *  LocalizerResourceRequestEvent request = reqs.remove(0);
-
-                pending.put(queue.submit(new FSDownload(
-                    lfs, null, conf, publicDirs,
-                    request.getResource().getRequest(), new Random())),
-                    request);
-                 */              }
+                attempts.remove(req);
+              }
+              // let the other containers know about the localization failure
+              for (LocalizerResourceRequestEvent reqEvent : reqs) {
+                dispatcher.getEventHandler().handle(
+                    new ContainerResourceFailedEvent(
+                        reqEvent.getContext().getContainerId(),
+                        reqEvent.getResource().getRequest(), e.getCause()));
+              }
             } catch (CancellationException e) {
               // ignore; shutting down
             }

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1450807&r1=1450806&r2=1450807&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 Wed Feb 27 15:30:10 2013
@@ -27,13 +27,16 @@ import static org.mockito.Matchers.argTh
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
@@ -46,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.server.nod
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@@ -102,6 +108,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestResourceLocalizationService {
 
@@ -512,6 +520,111 @@ public class TestResourceLocalizationSer
     }
   }
 
+  @Test(timeout=20000)
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testFailedPublicResource() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    String logDir = lfs.makeQualified(new Path(basedir, "logdir " 
)).toString();
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    DeletionService delService = mock(DeletionService.class);
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    dispatcher.init(conf);
+    dispatcher.start();
+
+    try {
+      ResourceLocalizationService rawService =
+          new ResourceLocalizationService(dispatcher, exec, delService,
+                                        dirsHandler);
+      ResourceLocalizationService spyService = spy(rawService);
+      doReturn(mockServer).when(spyService).createServer();
+      doReturn(lfs).when(spyService).getLocalFileContext(
+          isA(Configuration.class));
+
+      spyService.init(conf);
+      spyService.start();
+
+      final String user = "user0";
+      // init application
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      // init container.
+      final Container c = getMockContainer(appId, 42);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+
+      // cause chmod to fail after a delay
+      final CyclicBarrier barrier = new CyclicBarrier(2);
+      doAnswer(new Answer<Void>() {
+          public Void answer(InvocationOnMock invocation) throws IOException {
+            try {
+              barrier.await();
+            } catch (InterruptedException e) {
+            } catch (BrokenBarrierException e) {
+            }
+            throw new IOException("forced failure");
+          }
+        }).when(spylfs)
+            .setPermission(isA(Path.class), isA(FsPermission.class));
+
+      // Queue up two localization requests for the same public resource
+      final LocalResource pubResource = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq = new 
LocalResourceRequest(pubResource);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          new HashMap<LocalResourceVisibility,
+                      Collection<LocalResourceRequest>>();
+      req.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq));
+
+      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+      pubRsrcs.add(pubReq);
+
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      dispatcher.await();
+
+      // allow the chmod to fail now that both requests have been queued
+      barrier.await();
+      verify(containerBus, timeout(5000).times(2))
+          .handle(isA(ContainerResourceFailedEvent.class));
+    } finally {
+      dispatcher.stop();
+    }
+  }
+
   private static URL getPath(String path) {
     URL url = BuilderUtils.newURL("file", null, 0, path);
     return url;


Reply via email to