Author: vinodkv
Date: Thu Jul 17 00:15:28 2014
New Revision: 1611223
URL: http://svn.apache.org/r1611223
Log:
YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
after RM recovery but before scheduler learns about apps and app-attempts.
Contributed by Jian He.
svn merge --ignore-ancestry -c 1611222 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Jul 17
00:15:28 2014
@@ -44,6 +44,10 @@ Release 2.6.0 - UNRELEASED
YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
test failures. (Li Lu via jianhe)
+ YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions
+ after RM recovery but before scheduler learns about apps and app-attempts.
+ (Jian He via vinodkv)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
Thu Jul 17 00:15:28 2014
@@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
- // ACCECPTED state can once again receive APP_ACCEPTED event, because on
- // recovery the app returns ACCEPTED state and the app once again go
- // through the scheduler and triggers one more APP_ACCEPTED event at
- // ACCEPTED state.
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_ACCEPTED)
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp,
return app.recoveredFinalState;
}
- // Notify scheduler about the app on recovery
- new AddApplicationToSchedulerTransition().transition(app, event);
+ // No existent attempts means the attempt associated with this app was
not
+ // started or started but not yet saved.
+ if (app.attempts.isEmpty()) {
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+ app.submissionContext.getQueue(), app.user));
+ return RMAppState.SUBMITTED;
+ }
+
+ // Add application to scheduler synchronously to guarantee scheduler
+ // knows applications before AM or NM re-registers.
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+ app.submissionContext.getQueue(), app.user, true));
// recover attempts
app.recoverAppAttempts();
@@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp,
return RMAppState.ACCEPTED;
}
- // No existent attempts means the attempt associated with this app was
not
- // started or started but not yet saved.
- if (app.attempts.isEmpty()) {
- return RMAppState.SUBMITTED;
- }
-
// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Thu Jul 17 00:15:28 2014
@@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
- appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
- appAttempt.getAppAttemptId(), false, false));
+ // Add attempt to scheduler synchronously to guarantee scheduler
+ // knows attempts before AM or NM re-registers.
+ appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+ appAttempt.getAppAttemptId(), false, true));
}
/*
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Thu Jul 17 00:15:28 2014
@@ -521,7 +521,7 @@ public class CapacityScheduler extends
}
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
+ String queueName, String user, boolean isAppRecovering) {
// santiy checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
@@ -553,14 +553,20 @@ public class CapacityScheduler extends
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying
APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
- boolean shouldNotifyAttemptAdded) {
+ boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +584,15 @@ public class CapacityScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- if (shouldNotifyAttemptAdded) {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- } else {
+ if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
}
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@@ -905,7 +912,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(), appAddedEvent.getUser(),
+ appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@@ -921,7 +929,7 @@ public class CapacityScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
- appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+ appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
Thu Jul 17 00:15:28 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
private final ApplicationId applicationId;
private final String queue;
private final String user;
+ private final boolean isAppRecovering;
public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) {
+ this(applicationId, queue, user, false);
+ }
+
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user, boolean isAppRecovering) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
+ this.isAppRecovering = isAppRecovering;
}
public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
return user;
}
+ public boolean getIsAppRecovering() {
+ return isAppRecovering;
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
Thu Jul 17 00:15:28 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
private final ApplicationAttemptId applicationAttemptId;
private final boolean transferStateFromPreviousAttempt;
- private final boolean shouldNotifyAttemptAdded;
+ private final boolean isAttemptRecovering;
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
- this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+ this(applicationAttemptId, transferStateFromPreviousAttempt, false);
}
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
- boolean shouldNotifyAttemptAdded) {
+ boolean isAttemptRecovering) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
- this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+ this.isAttemptRecovering = isAttemptRecovering;
}
public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
return transferStateFromPreviousAttempt;
}
- public boolean getShouldNotifyAttemptAdded() {
- return shouldNotifyAttemptAdded;
+ public boolean getIsAttemptRecovering() {
+ return isAttemptRecovering;
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu Jul 17 00:15:28 2014
@@ -566,7 +566,7 @@ public class FairScheduler extends
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
+ String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
@@ -603,8 +603,14 @@ public class FairScheduler extends
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying
APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
/**
@@ -613,7 +619,7 @@ public class FairScheduler extends
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
- boolean shouldNotifyAttemptAdded) {
+ boolean isAttemptRecovering) {
SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
@@ -642,14 +648,15 @@ public class FairScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);
- if (shouldNotifyAttemptAdded) {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- } else {
+ if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
}
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@@ -1136,7 +1143,8 @@ public class FairScheduler extends
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(), appAddedEvent.getUser(),
+ appAddedEvent.getIsAppRecovering());
break;
case APP_REMOVED:
if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1154,7 +1162,7 @@ public class FairScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
- appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+ appAttemptAddedEvent.getIsAttemptRecovering());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Thu Jul 17 00:15:28 2014
@@ -356,22 +356,28 @@ public class FifoScheduler extends
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
- String queue, String user) {
+ String queue, String user, boolean isAppRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying
APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
@VisibleForTesting
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
- boolean shouldNotifyAttemptAdded) {
+ boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@@ -389,14 +395,15 @@ public class FifoScheduler extends
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
- if (shouldNotifyAttemptAdded) {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- } else {
+ if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ LOG.debug(appAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
}
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(appAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
@@ -772,7 +779,8 @@ public class FifoScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(), appAddedEvent.getUser(),
+ appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@@ -788,7 +796,7 @@ public class FifoScheduler extends
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
- appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+ appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
Thu Jul 17 00:15:28 2014
@@ -228,7 +228,7 @@ public class TestFifoScheduler {
scheduler.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = ApplicationId.newInstance(0, 1);
- scheduler.addApplication(appId, "queue1", "user1");
+ scheduler.addApplication(appId, "queue1", "user1", true);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
try {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
Thu Jul 17 00:15:28 2014
@@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart
attempt0.getMasterContainer().getId()).isAMContainer());
}
+ @Test (timeout = 20000)
+ public void testRecoverSchedulerAppAndAttemptSynchronously() throws
Exception {
+ // start RM
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ // scheduler app/attempt is immediately available after RM is re-started.
+ Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
+ am0.getApplicationAttemptId()));
+
+ // getTransferredContainers should not throw NPE.
+ ((AbstractYarnScheduler) rm2.getResourceScheduler())
+ .getTransferredContainers(am0.getApplicationAttemptId());
+
+ List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
+ nm1.registerNode(containers, null);
+ waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+ }
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
Thu Jul 17 00:15:28 2014
@@ -147,7 +147,7 @@ public class FairSchedulerTestBase {
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
- scheduler.addApplication(id.getApplicationId(), queueId, userId);
+ scheduler.addApplication(id.getApplicationId(), queueId, userId, true);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if
(scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1611223&r1=1611222&r2=1611223&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu Jul 17 00:15:28 2014
@@ -793,13 +793,13 @@ public class TestFairScheduler extends F
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
- scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
+ scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
true);
scheduler.addApplicationAttempt(id11, false, true);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
- scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
+ scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1",
true);
scheduler.addApplicationAttempt(id21, false, true);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
- scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
+ scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1",
true);
scheduler.addApplicationAttempt(id22, false, true);
int minReqSize =
@@ -1561,7 +1561,7 @@ public class TestFairScheduler extends F
scheduler.handle(nodeEvent2);
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
- scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
+ scheduler.addApplication(appId.getApplicationId(), "queue1", "user1",
true);
scheduler.addApplicationAttempt(appId, false, true);
// 1 request with 2 nodes on the same rack. another request with 1 node on
@@ -1843,7 +1843,7 @@ public class TestFairScheduler extends F
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
- scheduler.addApplication(attId.getApplicationId(), queue, user);
+ scheduler.addApplication(attId.getApplicationId(), queue, user, true);
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@@ -2720,7 +2720,7 @@ public class TestFairScheduler extends F
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
+ fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11",
true);
fs.addApplicationAttempt(appAttemptId, false, true);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =