Author: bikas
Date: Tue Jul  9 23:14:27 2013
New Revision: 1501605

URL: http://svn.apache.org/r1501605
Log:
YARN-369. Handle ( or throw a proper error when receiving) status updates from 
application masters that have not registered (Mayank Bansal & Abhishek Kapoor 
via bikas)

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1501605&r1=1501604&r2=1501605&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jul  9 23:14:27 2013
@@ -671,6 +671,10 @@ Release 2.1.0-beta - 2013-07-02
 
     YARN-845. RM crash with NPE on NODE_UPDATE (Mayank Bansal via bikas)
 
+    YARN-369. Handle ( or throw a proper error when receiving) status updates
+    from application masters that have not registered (Mayank Bansal &
+    Abhishek Kapoor via bikas)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1501605&r1=1501604&r2=1501605&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 Tue Jul  9 23:14:27 2013
@@ -195,17 +195,35 @@ public class ApplicationMasterService ex
     // Allow only one thread in AM to do registerApp at a time.
     synchronized (lastResponse) {
 
-      LOG.info("AM registration " + applicationAttemptId);
+      if (hasApplicationMasterRegistered(applicationAttemptId)) {
+        String message =
+            "Application Master is already registered : "
+                + applicationAttemptId.getApplicationId();
+        LOG.warn(message);
+        RMAuditLogger.logFailure(
+          this.rmContext.getRMApps()
+            .get(applicationAttemptId.getApplicationId()).getUser(),
+          AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
+          applicationAttemptId.getApplicationId(), applicationAttemptId);
+        throw new InvalidApplicationMasterRequestException(message);
+      }
+      
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
-
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
-              .getHost(), request.getRpcPort(), request.getTrackingUrl()));
-
       RMApp app = this.rmContext.getRMApps().get(appID);
-      RMAuditLogger.logSuccess(app.getUser(),
-          AuditConstants.REGISTER_AM, "ApplicationMasterService", appID,
-          applicationAttemptId);
+      
+      // Setting the response id to 0 to identify if the
+      // application master is register for the respective attemptid
+      lastResponse.setResponseId(0);
+      responseMap.put(applicationAttemptId, lastResponse);
+      LOG.info("AM registration " + applicationAttemptId);
+      this.rmContext
+        .getDispatcher()
+        .getEventHandler()
+        .handle(
+          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
+            .getHost(), request.getRpcPort(), request.getTrackingUrl()));
+      RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
+        "ApplicationMasterService", appID, applicationAttemptId);
 
       // Pick up min/max resource from scheduler...
       RegisterApplicationMasterResponse response = recordFactory
@@ -257,6 +275,24 @@ public class ApplicationMasterService ex
     }
   }
 
+  /**
+   * @param appAttemptId
+   * @return true if application is registered for the respective attemptid
+   */
+  public boolean hasApplicationMasterRegistered(
+      ApplicationAttemptId appAttemptId) {
+    boolean hasApplicationMasterRegistered = false;
+    AllocateResponse lastResponse = responseMap.get(appAttemptId);
+    if (lastResponse != null) {
+      synchronized (lastResponse) {
+        if (lastResponse.getResponseId() >= 0) {
+          hasApplicationMasterRegistered = true;
+        }
+      }
+    }
+    return hasApplicationMasterRegistered;
+  }
+
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
@@ -272,6 +308,20 @@ public class ApplicationMasterService ex
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       return resync;
     }
+    
+    if (!hasApplicationMasterRegistered(appAttemptId)) {
+      String message =
+          "Application Master is trying to allocate before registering for: "
+              + appAttemptId.getApplicationId();
+      LOG.error(message);
+      RMAuditLogger.logFailure(
+        this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+          .getUser(), AuditConstants.REGISTER_AM, "",
+        "ApplicationMasterService", message, appAttemptId.getApplicationId(),
+        appAttemptId);
+      throw new InvalidApplicationMasterRequestException(message);
+    }
+
     if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
       /* old heartbeat */
       return lastResponse;
@@ -442,7 +492,9 @@ public class ApplicationMasterService ex
   public void registerAppAttempt(ApplicationAttemptId attemptId) {
     AllocateResponse response =
         recordFactory.newRecordInstance(AllocateResponse.class);
-    response.setResponseId(0);
+    // set response id to -1 before application master for the following
+    // attemptID get registered
+    response.setResponseId(-1);
     LOG.info("Registering app attempt : " + attemptId);
     responseMap.put(attemptId, response);
     rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java?rev=1501605&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
 Tue Jul  9 23:14:27 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * The exception is thrown when an application Master call allocate without
+ * calling RegisterApplicationMaster or try to register more then once.
+ */
+public class InvalidApplicationMasterRequestException extends YarnException {
+
+  private static final long serialVersionUID = 1357686L;
+
+  public InvalidApplicationMasterRequestException(Throwable cause) {
+    super(cause);
+  }
+
+  public InvalidApplicationMasterRequestException(String message) {
+    super(message);
+  }
+
+  public InvalidApplicationMasterRequestException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+}

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1501605&r1=1501604&r2=1501605&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
 Tue Jul  9 23:14:27 2013
@@ -78,10 +78,19 @@ public class MockAM {
         finalState, attempt.getAppAttemptState());
   }
 
-  public RegisterApplicationMasterResponse registerAppAttempt() throws 
Exception {
-    waitForState(RMAppAttemptState.LAUNCHED);
+  public RegisterApplicationMasterResponse registerAppAttempt()
+      throws Exception {
+    return registerAppAttempt(true);
+  }
+
+  public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
+      throws Exception {
+    if (wait) {
+      waitForState(RMAppAttemptState.LAUNCHED);
+    }
     responseId = 0;
-    RegisterApplicationMasterRequest req = 
Records.newRecord(RegisterApplicationMasterRequest.class);
+    RegisterApplicationMasterRequest req =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
     req.setApplicationAttemptId(attemptId);
     req.setHost("");
     req.setRpcPort(1);

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1501605&r1=1501604&r2=1501605&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
 Tue Jul  9 23:14:27 2013
@@ -19,12 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -166,4 +169,58 @@ public class TestApplicationMasterLaunch
     am.waitForState(RMAppAttemptState.FINISHED);
     rm.stop();
   }
+  
+    
+  @SuppressWarnings("unused")
+  @Test(timeout = 100000)
+  public void testallocateBeforeAMRegistration() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    boolean thrown = false;
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
+    RMApp app = rm.submitApp(2000);
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+
+    // request for containers
+    int request = 2;
+    try {
+      AllocateResponse ar =
+          am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+    } catch (Exception e) {
+      Assert.assertEquals("Application Master is trying to allocate before "
+          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
+        e.getMessage());
+      thrown = true;
+    }
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    try {
+      AllocateResponse amrs =
+          am.allocate(new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>());
+    } catch (Exception e) {
+      Assert.assertEquals("Application Master is trying to allocate before "
+          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
+        e.getMessage());
+      thrown = true;
+    }
+    Assert.assertTrue(thrown);
+    am.registerAppAttempt();
+    thrown = false;
+    try {
+    am.registerAppAttempt(false);
+    }
+    catch (Exception e) {
+      Assert.assertEquals("Application Master is already registered : "
+          + attempt.getAppAttemptId().getApplicationId(),
+        e.getMessage());
+      thrown = true;
+    }
+    Assert.assertTrue(thrown);
+  }
 }


Reply via email to