YARN-8298. Added express upgrade for YARN service. Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e557c6bd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e557c6bd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e557c6bd Branch: refs/heads/HDFS-12943 Commit: e557c6bd8de2811a561210f672f47b4d07a9d5c6 Parents: 9c3fc3e Author: Eric Yang <ey...@apache.org> Authored: Tue Aug 21 19:49:26 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Tue Aug 21 19:49:26 2018 -0400 ---------------------------------------------------------------------- .../yarn/service/client/ApiServiceClient.java | 20 + .../hadoop/yarn/service/webapp/ApiServer.java | 12 +- .../hadoop/yarn/service/ClientAMService.java | 2 +- .../hadoop/yarn/service/ServiceEvent.java | 25 + .../hadoop/yarn/service/ServiceManager.java | 127 +++- .../hadoop/yarn/service/ServiceScheduler.java | 15 +- .../yarn/service/api/records/ServiceState.java | 2 +- .../yarn/service/client/ServiceClient.java | 100 ++- .../yarn/service/component/Component.java | 16 +- .../yarn/service/component/ComponentEvent.java | 10 + .../component/instance/ComponentInstance.java | 5 + .../yarn/service/utils/ServiceApiUtil.java | 44 ++ .../src/main/proto/ClientAMProtocol.proto | 1 + .../hadoop/yarn/service/TestServiceApiUtil.java | 653 ---------------- .../hadoop/yarn/service/TestServiceManager.java | 299 +++++--- .../yarn/service/TestYarnNativeServices.java | 35 + .../yarn/service/utils/TestServiceApiUtil.java | 743 +++++++++++++++++++ .../hadoop/yarn/client/cli/ApplicationCLI.java | 20 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 4 + .../hadoop/yarn/client/api/AppAdminClient.java | 12 + 20 files changed, 1308 insertions(+), 837 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 9229446..ca6cc50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -601,6 +601,26 @@ public class ApiServiceClient extends AppAdminClient { } @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + int result; + try { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.EXPRESS_UPGRADING); + String buffer = jsonSerDeser.toJson(service); + LOG.info("Upgrade in progress. Please wait.."); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade application: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } + + @Override public int initiateUpgrade(String appName, String fileName, boolean autoFinalize) throws IOException, YarnException { int result; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 4db0ac8..cd6f0d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -440,7 +440,8 @@ public class ApiServer { if (updateServiceData.getState() != null && ( updateServiceData.getState() == ServiceState.UPGRADING || updateServiceData.getState() == - ServiceState.UPGRADING_AUTO_FINALIZE)) { + ServiceState.UPGRADING_AUTO_FINALIZE) || + updateServiceData.getState() == ServiceState.EXPRESS_UPGRADING) { return upgradeService(updateServiceData, ugi); } @@ -690,7 +691,11 @@ public class ApiServer { ServiceClient sc = getServiceClient(); sc.init(YARN_CONFIG); sc.start(); - sc.initiateUpgrade(service); + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + sc.actionUpgradeExpress(service); + } else { + sc.initiateUpgrade(service); + } sc.close(); return null; }); @@ -706,7 +711,8 @@ public class ApiServer { String serviceName, Set<String> compNames) throws YarnException, IOException, InterruptedException { Service service = getServiceFromClient(ugi, serviceName); - if (service.getState() != ServiceState.UPGRADING) { + if (!service.getState().equals(ServiceState.UPGRADING) && + !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { throw new YarnException( String.format("The upgrade of service %s has not been initiated.", service.getName())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 5bf1833..2ef8f7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -166,7 +166,7 @@ public class ClientAMService extends AbstractService LOG.info("Upgrading service to version {} by {}", request.getVersion(), UserGroupInformation.getCurrentUser()); context.getServiceManager().processUpgradeRequest(request.getVersion(), - request.getAutoFinalize()); + request.getAutoFinalize(), request.getExpressUpgrade()); return UpgradeServiceResponseProto.newBuilder().build(); } catch (Exception ex) { return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 0196be2..3a55472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.service.api.records.Component; + +import java.util.Queue; /** * Events are handled by {@link ServiceManager} to manage the service @@ -29,6 +32,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> { private final ServiceEventType type; private String version; private boolean autoFinalize; + private boolean expressUpgrade; + private Queue<Component> compsToUpgradeInOrder; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -56,4 +61,24 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> { this.autoFinalize = autoFinalize; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ServiceEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } + + public Queue<Component> getCompsToUpgradeInOrder() { + return compsToUpgradeInOrder; + } + + public ServiceEvent setCompsToUpgradeInOrder( + Queue<Component> compsToUpgradeInOrder) { + this.compsToUpgradeInOrder = compsToUpgradeInOrder; + return this; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 05ecb3f..04454b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; @@ -40,8 +41,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.MessageFormat; import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -67,6 +71,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> { private final SliderFileSystem fs; private String upgradeVersion; + private Queue<org.apache.hadoop.yarn.service.api.records + .Component> compsToUpgradeInOrder; private static final StateMachineFactory<ServiceManager, State, ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY = @@ -141,14 +147,20 @@ public class ServiceManager implements EventHandler<ServiceEvent> { @Override public State transition(ServiceManager serviceManager, ServiceEvent event) { + serviceManager.upgradeVersion = event.getVersion(); try { - if (!event.isAutoFinalize()) { - serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + if (event.isExpressUpgrade()) { + serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING); + serviceManager.compsToUpgradeInOrder = event + .getCompsToUpgradeInOrder(); + serviceManager.upgradeNextCompIfAny(); + } else if (event.isAutoFinalize()) { + serviceManager.serviceSpec.setState(ServiceState + .UPGRADING_AUTO_FINALIZE); } else { serviceManager.serviceSpec.setState( - ServiceState.UPGRADING_AUTO_FINALIZE); + ServiceState.UPGRADING); } - serviceManager.upgradeVersion = event.getVersion(); return State.UPGRADING; } catch (Throwable e) { LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), @@ -169,8 +181,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> { if (currState.equals(ServiceState.STABLE)) { return State.STABLE; } + if (currState.equals(ServiceState.EXPRESS_UPGRADING)) { + org.apache.hadoop.yarn.service.api.records.Component component = + serviceManager.compsToUpgradeInOrder.peek(); + if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) && + !component.getState().equals(ComponentState.UPGRADING)) { + serviceManager.compsToUpgradeInOrder.remove(); + } + serviceManager.upgradeNextCompIfAny(); + } if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - event.getType().equals(ServiceEventType.START)) { + event.getType().equals(ServiceEventType.START) || + (currState.equals(ServiceState.EXPRESS_UPGRADING) && + serviceManager.compsToUpgradeInOrder.isEmpty())) { ServiceState targetState = checkIfStable(serviceManager.serviceSpec); if (targetState.equals(ServiceState.STABLE)) { if (serviceManager.finalizeUpgrade()) { @@ -184,6 +207,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> { } } + private void upgradeNextCompIfAny() { + if (!compsToUpgradeInOrder.isEmpty()) { + org.apache.hadoop.yarn.service.api.records.Component component = + compsToUpgradeInOrder.peek(); + + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE).setTargetSpec( + component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + } + } + /** * @return whether finalization of upgrade was successful. */ @@ -250,23 +286,18 @@ public class ServiceManager implements EventHandler<ServiceEvent> { } void processUpgradeRequest(String upgradeVersion, - boolean autoFinalize) throws IOException { + boolean autoFinalize, boolean expressUpgrade) throws IOException { Service targetSpec = ServiceApiUtil.loadServiceUpgrade( context.fs, context.service.getName(), upgradeVersion); List<org.apache.hadoop.yarn.service.api.records.Component> - compsThatNeedUpgrade = componentsFinder. + compsNeedUpgradeList = componentsFinder. findTargetComponentSpecs(context.service, targetSpec); - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) - .setVersion(upgradeVersion) - .setAutoFinalize(autoFinalize); - context.scheduler.getDispatcher().getEventHandler().handle(event); - if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { - if (autoFinalize) { - event.setAutoFinalize(true); - } - compsThatNeedUpgrade.forEach(component -> { + // remove all components from need upgrade list if there restart policy + // doesn't all upgrade. + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.removeIf(component -> { org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy = component.getRestartPolicy(); @@ -274,25 +305,65 @@ public class ServiceManager implements EventHandler<ServiceEvent> { Component.getRestartPolicyHandler(restartPolicy); // Do not allow upgrades for components which have NEVER/ON_FAILURE // restart policy - if (restartPolicyHandler.allowUpgrades()) { + if (!restartPolicyHandler.allowUpgrades()) { + LOG.info("The component {} has a restart policy that doesnt " + + "allow upgrades {} ", component.getName(), + component.getRestartPolicy().toString()); + return true; + } + + return false; + }); + } + + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(upgradeVersion) + .setAutoFinalize(autoFinalize) + .setExpressUpgrade(expressUpgrade); + + if (expressUpgrade) { + // In case of express upgrade components need to be upgraded in order. + // Once the service manager gets notified that a component finished + // upgrading, it then issues event to upgrade the next component. + Map<String, org.apache.hadoop.yarn.service.api.records.Component> + compsNeedUpgradeByName = new HashMap<>(); + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.forEach(component -> + compsNeedUpgradeByName.put(component.getName(), component)); + } + List<String> resolvedComps = ServiceApiUtil + .resolveCompsDependency(targetSpec); + + Queue<org.apache.hadoop.yarn.service.api.records.Component> + orderedCompUpgrade = new LinkedList<>(); + resolvedComps.forEach(compName -> { + org.apache.hadoop.yarn.service.api.records.Component component = + compsNeedUpgradeByName.get(compName); + if (component != null ) { + orderedCompUpgrade.add(component); + } + }); + event.setCompsToUpgradeInOrder(orderedCompUpgrade); + } + + context.scheduler.getDispatcher().getEventHandler().handle(event); + + if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) { + if (!expressUpgrade) { + compsNeedUpgradeList.forEach(component -> { ComponentEvent needUpgradeEvent = new ComponentEvent( component.getName(), ComponentEventType.UPGRADE).setTargetSpec( component).setUpgradeVersion(event.getVersion()); context.scheduler.getDispatcher().getEventHandler().handle( needUpgradeEvent); - } else { - LOG.info("The component {} has a restart " - + "policy that doesnt allow upgrades {} ", component.getName(), - component.getRestartPolicy().toString()); - } - }); - } else { + + }); + } + } else if (autoFinalize) { // nothing to upgrade if upgrade auto finalize is requested, trigger a // state check. - if (autoFinalize) { - context.scheduler.getDispatcher().getEventHandler().handle( - new ServiceEvent(ServiceEventType.CHECK_STABLE)); - } + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0801ad0..384659f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -219,7 +219,7 @@ public class ServiceScheduler extends CompositeService { nmClient.getClient().cleanupRunningContainersOnStop(false); addIfService(nmClient); - dispatcher = new AsyncDispatcher("Component dispatcher"); + dispatcher = createAsyncDispatcher(); dispatcher.register(ServiceEventType.class, new ServiceEventHandler()); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); @@ -253,6 +253,9 @@ public class ServiceScheduler extends CompositeService { YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + + serviceManager = createServiceManager(); + context.setServiceManager(serviceManager); } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -262,6 +265,14 @@ public class ServiceScheduler extends CompositeService { context.attemptId); } + protected ServiceManager createServiceManager() { + return new ServiceManager(context); + } + + protected AsyncDispatcher createAsyncDispatcher() { + return new AsyncDispatcher("Component dispatcher"); + } + protected NMClientAsync createNMClient() { return NMClientAsync.createNMClientAsync(new NMClientCallback()); } @@ -344,8 +355,6 @@ public class ServiceScheduler extends CompositeService { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); - serviceManager = new ServiceManager(context); - context.setServiceManager(serviceManager); // recover components based on containers sent from RM recoverComponents(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index b6ae38b..0b3c037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability; @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 5668d9f..a27ed87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.client; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -215,48 +216,31 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return EXIT_SUCCESS; } - @Override - public int initiateUpgrade(String appName, String fileName, - boolean autoFinalize) - throws IOException, YarnException { - Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, - null, null); - if (autoFinalize) { - upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); - } else { - upgradeService.setState(ServiceState.UPGRADING); - } - return initiateUpgrade(upgradeService); - } - - public int initiateUpgrade(Service service) throws YarnException, - IOException { + private ApplicationReport upgradePrecheck(Service service) + throws YarnException, IOException { boolean upgradeEnabled = getConfig().getBoolean( - YARN_SERVICE_UPGRADE_ENABLED, - YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); + YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT); if (!upgradeEnabled) { throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED); } - Service persistedService = - ServiceApiUtil.loadService(fs, service.getName()); + Service persistedService = ServiceApiUtil.loadService(fs, + service.getName()); if (!StringUtils.isEmpty(persistedService.getId())) { - cachedAppInfo.put(persistedService.getName(), new AppInfo( - ApplicationId.fromString(persistedService.getId()), - persistedService.getKerberosPrincipal().getPrincipalName())); + cachedAppInfo.put(persistedService.getName(), + new AppInfo(ApplicationId.fromString(persistedService.getId()), + persistedService.getKerberosPrincipal().getPrincipalName())); } if (persistedService.getVersion().equals(service.getVersion())) { - String message = - service.getName() + " is already at version " + service.getVersion() - + ". There is nothing to upgrade."; + String message = service.getName() + " is already at version " + + service.getVersion() + ". There is nothing to upgrade."; LOG.error(message); throw new YarnException(message); } Service liveService = getStatus(service.getName()); if (!liveService.getState().equals(ServiceState.STABLE)) { - String message = service.getName() + " is at " + - liveService.getState() + String message = service.getName() + " is at " + liveService.getState() + " state and upgrade can only be initiated when service is STABLE."; LOG.error(message); throw new YarnException(message); @@ -266,11 +250,67 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); - ApplicationReport appReport = - yarnClient.getApplicationReport(getAppId(service.getName())); + ApplicationReport appReport = yarnClient + .getApplicationReport(getAppId(service.getName())); if (StringUtils.isEmpty(appReport.getHost())) { throw new YarnException(service.getName() + " AM hostname is empty"); } + return appReport; + } + + @Override + public int actionUpgradeExpress(String appName, File path) + throws IOException, YarnException { + Service service = + loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null); + service.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + actionUpgradeExpress(service); + return EXIT_SUCCESS; + } + + public int actionUpgradeExpress(Service service) throws YarnException, + IOException { + ApplicationReport appReport = upgradePrecheck(service); + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + UpgradeServiceRequestProto.Builder requestBuilder = + UpgradeServiceRequestProto.newBuilder(); + requestBuilder.setVersion(service.getVersion()); + if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { + requestBuilder.setAutoFinalize(true); + } + if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) { + requestBuilder.setExpressUpgrade(true); + requestBuilder.setAutoFinalize(true); + } + UpgradeServiceResponseProto responseProto = proxy.upgrade( + requestBuilder.build()); + if (responseProto.hasError()) { + LOG.error("Service {} express upgrade to version {} failed because {}", + service.getName(), service.getVersion(), responseProto.getError()); + throw new YarnException("Failed to express upgrade service " + + service.getName() + " to version " + service.getVersion() + + " because " + responseProto.getError()); + } + return EXIT_SUCCESS; + } + + @Override + public int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) + throws IOException, YarnException { + Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, + null, null); + if (autoFinalize) { + upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + upgradeService.setState(ServiceState.UPGRADING); + } + return initiateUpgrade(upgradeService); + } + + public int initiateUpgrade(Service service) throws YarnException, + IOException { + ApplicationReport appReport = upgradePrecheck(service); ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); UpgradeServiceRequestProto.Builder requestBuilder = http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 41a2fcd..acf3404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import static org.apache.hadoop.yarn.service.api.records.Component @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.ServiceEventType; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; @@ -546,13 +548,21 @@ public class Component implements EventHandler<ComponentEvent> { @Override public void transition(Component component, ComponentEvent event) { component.upgradeInProgress.set(true); + component.upgradeEvent = event; component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); component.numContainersThatNeedUpgrade.set( component.componentSpec.getNumberOfContainers()); - component.componentSpec.getContainers().forEach(container -> - container.setState(ContainerState.NEEDS_UPGRADE)); - component.upgradeEvent = event; + component.componentSpec.getContainers().forEach(container -> { + container.setState(ContainerState.NEEDS_UPGRADE); + if (event.isExpressUpgrade()) { + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + ContainerId.fromString(container.getId()), + ComponentInstanceEventType.UPGRADE); + LOG.info("Upgrade container {}", container.getId()); + component.dispatcher.getEventHandler().handle(upgradeEvent); + } + }); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 84caa77..643961d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -35,6 +35,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; private String upgradeVersion; + private boolean expressUpgrade; public ContainerId getContainerId() { return containerId; @@ -113,4 +114,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { this.upgradeVersion = upgradeVersion; return this; } + + public boolean isExpressUpgrade() { + return expressUpgrade; + } + + public ComponentEvent setExpressUpgrade(boolean expressUpgrade) { + this.expressUpgrade = expressUpgrade; + return this; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 11a6caa..ed5e68e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -380,6 +380,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { + if (!compInstance.containerSpec.getState().equals( + ContainerState.NEEDS_UPGRADE)) { + //nothing to upgrade. this may happen with express upgrade. + return; + } compInstance.containerSpec.setState(ContainerState.UPGRADING); compInstance.component.decContainersReady(false); ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 9219569..b588e88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -638,6 +638,32 @@ public class ServiceApiUtil { return containerNeedUpgrade; } + /** + * Validates the components that are requested are stable for upgrade. + * It returns the instances of the components which are in ready state. + */ + public static List<Container> validateAndResolveCompsStable( + Service liveService, Collection<String> compNames) throws YarnException { + Preconditions.checkNotNull(compNames); + HashSet<String> requestedComps = Sets.newHashSet(compNames); + List<Container> containerNeedUpgrade = new ArrayList<>(); + for (Component liveComp : liveService.getComponents()) { + if (requestedComps.contains(liveComp.getName())) { + if (!liveComp.getState().equals(ComponentState.STABLE)) { + // Nothing to upgrade + throw new YarnException(String.format( + ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); + } + liveComp.getContainers().forEach(liveContainer -> { + if (liveContainer.getState().equals(ContainerState.READY)) { + containerNeedUpgrade.add(liveContainer); + } + }); + } + } + return containerNeedUpgrade; + } + private static String parseComponentName(String componentInstanceName) throws YarnException { int idx = componentInstanceName.lastIndexOf('-'); @@ -651,4 +677,22 @@ public class ServiceApiUtil { public static String $(String s) { return "${" + s +"}"; } + + public static List<String> resolveCompsDependency(Service service) { + List<String> components = new ArrayList<String>(); + for (Component component : service.getComponents()) { + int depSize = component.getDependencies().size(); + if (!components.contains(component.getName())) { + components.add(component.getName()); + } + if (depSize != 0) { + for (String depComp : component.getDependencies()) { + if (!components.contains(depComp)) { + components.add(0, depComp); + } + } + } + } + return components; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 6166ded..169f765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -66,6 +66,7 @@ message StopResponseProto { message UpgradeServiceRequestProto { optional string version = 1; optional bool autoFinalize = 2; + optional bool expressUpgrade = 3; } message UpgradeServiceResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java deleted file mode 100644 index c2a80e7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java +++ /dev/null @@ -1,653 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.service; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.registry.client.api.RegistryConstants; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.service.api.records.Artifact; -import org.apache.hadoop.yarn.service.api.records.Component; -import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; -import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; -import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; -import org.apache.hadoop.yarn.service.api.records.PlacementScope; -import org.apache.hadoop.yarn.service.api.records.PlacementType; -import org.apache.hadoop.yarn.service.api.records.Resource; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages; -import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; -import org.apache.hadoop.yarn.service.utils.SliderFileSystem; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.apache.hadoop.yarn.service.conf.RestApiConstants.DEFAULT_UNLIMITED_LIFETIME; -import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * Test for ServiceApiUtil helper methods. - */ -public class TestServiceApiUtil { - private static final Logger LOG = LoggerFactory - .getLogger(TestServiceApiUtil.class); - private static final String EXCEPTION_PREFIX = "Should have thrown " + - "exception: "; - private static final String NO_EXCEPTION_PREFIX = "Should not have thrown " + - "exception: "; - - private static final String LEN_64_STR = - "abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz01"; - - private static final YarnConfiguration CONF_DEFAULT_DNS = new - YarnConfiguration(); - private static final YarnConfiguration CONF_DNS_ENABLED = new - YarnConfiguration(); - - @BeforeClass - public static void init() { - CONF_DNS_ENABLED.setBoolean(RegistryConstants.KEY_DNS_ENABLED, true); - } - - @Test(timeout = 90000) - public void testResourceValidation() throws Exception { - assertEquals(RegistryConstants.MAX_FQDN_LABEL_LENGTH + 1, LEN_64_STR - .length()); - - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - - Service app = new Service(); - - // no name - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no name"); - } catch (IllegalArgumentException e) { - assertEquals(ERROR_APPLICATION_NAME_INVALID, e.getMessage()); - } - - app.setName("test"); - // no version - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + " service with no version"); - } catch (IllegalArgumentException e) { - assertEquals(String.format(ERROR_APPLICATION_VERSION_INVALID, - app.getName()), e.getMessage()); - } - - app.setVersion("v1"); - // bad format name - String[] badNames = {"4finance", "Finance", "finance@home", LEN_64_STR}; - for (String badName : badNames) { - app.setName(badName); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with bad name " + badName); - } catch (IllegalArgumentException e) { - - } - } - - // launch command not specified - app.setName(LEN_64_STR); - Component comp = new Component().name("comp1"); - app.addComponent(comp); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS); - Assert.fail(EXCEPTION_PREFIX + "service with no launch command"); - } catch (IllegalArgumentException e) { - assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND, - e.getMessage()); - } - - // launch command not specified - app.setName(LEN_64_STR.substring(0, RegistryConstants - .MAX_FQDN_LABEL_LENGTH)); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no launch command"); - } catch (IllegalArgumentException e) { - assertEquals(RestApiErrorMessages.ERROR_ABSENT_LAUNCH_COMMAND, - e.getMessage()); - } - - // memory not specified - comp.setLaunchCommand("sleep 1"); - Resource res = new Resource(); - app.setResource(res); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no memory"); - } catch (IllegalArgumentException e) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID, - comp.getName()), e.getMessage()); - } - - // invalid no of cpus - res.setMemory("100mb"); - res.setCpus(-2); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail( - EXCEPTION_PREFIX + "service with invalid no of cpus"); - } catch (IllegalArgumentException e) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE, - comp.getName()), e.getMessage()); - } - - // number of containers not specified - res.setCpus(2); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no container count"); - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage() - .contains(ERROR_CONTAINERS_COUNT_INVALID)); - } - - // specifying profile along with cpus/memory raises exception - res.setProfile("hbase_finance_large"); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX - + "service with resource profile along with cpus/memory"); - } catch (IllegalArgumentException e) { - assertEquals(String.format(RestApiErrorMessages - .ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED, - comp.getName()), - e.getMessage()); - } - - // currently resource profile alone is not supported. - // TODO: remove the next test once resource profile alone is supported. - res.setCpus(null); - res.setMemory(null); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with resource profile only"); - } catch (IllegalArgumentException e) { - assertEquals(ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET, - e.getMessage()); - } - - // unset profile here and add cpus/memory back - res.setProfile(null); - res.setCpus(2); - res.setMemory("2gb"); - - // null number of containers - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "null number of containers"); - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage() - .startsWith(ERROR_CONTAINERS_COUNT_INVALID)); - } - } - - @Test - public void testArtifacts() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - - Service app = new Service(); - app.setName("service1"); - app.setVersion("v1"); - Resource res = new Resource(); - app.setResource(res); - res.setMemory("512M"); - - // no artifact id fails with default type - Artifact artifact = new Artifact(); - app.setArtifact(artifact); - String compName = "comp1"; - Component comp = ServiceTestUtils.createComponent(compName); - - app.setComponents(Collections.singletonList(comp)); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no artifact id"); - } catch (IllegalArgumentException e) { - assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName), - e.getMessage()); - } - - // no artifact id fails with SERVICE type - artifact.setType(Artifact.TypeEnum.SERVICE); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no artifact id"); - } catch (IllegalArgumentException e) { - assertEquals(ERROR_ARTIFACT_ID_INVALID, e.getMessage()); - } - - // no artifact id fails with TARBALL type - artifact.setType(Artifact.TypeEnum.TARBALL); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with no artifact id"); - } catch (IllegalArgumentException e) { - assertEquals(String.format(ERROR_ARTIFACT_ID_FOR_COMP_INVALID, compName), - e.getMessage()); - } - - // everything valid here - artifact.setType(Artifact.TypeEnum.DOCKER); - artifact.setId("docker.io/centos:centos7"); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - LOG.error("service attributes specified should be valid here", e); - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - - assertEquals(app.getLifetime(), DEFAULT_UNLIMITED_LIFETIME); - } - - private static Resource createValidResource() { - Resource res = new Resource(); - res.setMemory("512M"); - return res; - } - - private static Component createValidComponent(String compName) { - Component comp = new Component(); - comp.setName(compName); - comp.setResource(createValidResource()); - comp.setNumberOfContainers(1L); - comp.setLaunchCommand("sleep 1"); - return comp; - } - - private static Service createValidApplication(String compName) { - Service app = new Service(); - app.setName("name"); - app.setVersion("v1"); - app.setResource(createValidResource()); - if (compName != null) { - app.addComponent(createValidComponent(compName)); - } - return app; - } - - @Test - public void testExternalApplication() throws IOException { - Service ext = createValidApplication("comp1"); - SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext); - - Service app = createValidApplication(null); - - Artifact artifact = new Artifact(); - artifact.setType(Artifact.TypeEnum.SERVICE); - artifact.setId("id"); - app.setArtifact(artifact); - app.addComponent(ServiceTestUtils.createComponent("comp2")); - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - - assertEquals(1, app.getComponents().size()); - assertNotNull(app.getComponent("comp2")); - } - - @Test - public void testDuplicateComponents() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - - String compName = "comp1"; - Service app = createValidApplication(compName); - app.addComponent(createValidComponent(compName)); - - // duplicate component name fails - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with component collision"); - } catch (IllegalArgumentException e) { - assertEquals("Component name collision: " + compName, e.getMessage()); - } - } - - @Test - public void testComponentNameSameAsServiceName() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - Service app = new Service(); - app.setName("test"); - app.setVersion("v1"); - app.addComponent(createValidComponent("test")); - - //component name same as service name - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "component name matches service name"); - } catch (IllegalArgumentException e) { - assertEquals("Component name test must not be same as service name test", - e.getMessage()); - } - } - - @Test - public void testExternalDuplicateComponent() throws IOException { - Service ext = createValidApplication("comp1"); - SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext); - - Service app = createValidApplication("comp1"); - Artifact artifact = new Artifact(); - artifact.setType(Artifact.TypeEnum.SERVICE); - artifact.setId("id"); - app.getComponent("comp1").setArtifact(artifact); - - // duplicate component name okay in the case of SERVICE component - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - } - - @Test - public void testExternalComponent() throws IOException { - Service ext = createValidApplication("comp1"); - SliderFileSystem sfs = ServiceTestUtils.initMockFs(ext); - - Service app = createValidApplication("comp2"); - Artifact artifact = new Artifact(); - artifact.setType(Artifact.TypeEnum.SERVICE); - artifact.setId("id"); - app.setArtifact(artifact); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - - assertEquals(1, app.getComponents().size()); - // artifact ID not inherited from global - assertNotNull(app.getComponent("comp2")); - - // set SERVICE artifact id on component - app.getComponent("comp2").setArtifact(artifact); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - - assertEquals(1, app.getComponents().size()); - // original component replaced by external component - assertNotNull(app.getComponent("comp1")); - } - - public static void verifyDependencySorting(List<Component> components, - Component... expectedSorting) { - Collection<Component> actualSorting = ServiceApiUtil.sortByDependencies( - components); - assertEquals(expectedSorting.length, actualSorting.size()); - int i = 0; - for (Component component : actualSorting) { - assertEquals(expectedSorting[i++], component); - } - } - - @Test - public void testDependencySorting() throws IOException { - Component a = ServiceTestUtils.createComponent("a"); - Component b = ServiceTestUtils.createComponent("b"); - Component c = ServiceTestUtils.createComponent("c"); - Component d = - ServiceTestUtils.createComponent("d").dependencies(Arrays.asList("c")); - Component e = ServiceTestUtils.createComponent("e") - .dependencies(Arrays.asList("b", "d")); - - verifyDependencySorting(Arrays.asList(a, b, c), a, b, c); - verifyDependencySorting(Arrays.asList(c, a, b), c, a, b); - verifyDependencySorting(Arrays.asList(a, b, c, d, e), a, b, c, d, e); - verifyDependencySorting(Arrays.asList(e, d, c, b, a), c, b, a, d, e); - - c.setDependencies(Arrays.asList("e")); - try { - verifyDependencySorting(Arrays.asList(a, b, c, d, e)); - Assert.fail(EXCEPTION_PREFIX + "components with dependency cycle"); - } catch (IllegalArgumentException ex) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_DEPENDENCY_CYCLE, Arrays.asList(c, d, - e)), ex.getMessage()); - } - - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - Service service = createValidApplication(null); - service.setComponents(Arrays.asList(c, d, e)); - try { - ServiceApiUtil.validateAndResolveService(service, sfs, - CONF_DEFAULT_DNS); - Assert.fail(EXCEPTION_PREFIX + "components with bad dependencies"); - } catch (IllegalArgumentException ex) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, "b", "e"), ex - .getMessage()); - } - } - - @Test - public void testInvalidComponent() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - testComponent(sfs); - } - - @Test - public void testValidateCompName() { - String[] invalidNames = { - "EXAMPLE", // UPPER case not allowed - "example_app" // underscore not allowed. - }; - for (String name : invalidNames) { - try { - ServiceApiUtil.validateNameFormat(name, new Configuration()); - Assert.fail(); - } catch (IllegalArgumentException ex) { - ex.printStackTrace(); - } - } - } - - private static void testComponent(SliderFileSystem sfs) - throws IOException { - int maxLen = RegistryConstants.MAX_FQDN_LABEL_LENGTH; - assertEquals(19, Long.toString(Long.MAX_VALUE).length()); - maxLen = maxLen - Long.toString(Long.MAX_VALUE).length(); - - String compName = LEN_64_STR.substring(0, maxLen + 1); - Service app = createValidApplication(null); - app.addComponent(createValidComponent(compName)); - - // invalid component name fails if dns is enabled - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "service with invalid component name"); - } catch (IllegalArgumentException e) { - assertEquals(String.format(RestApiErrorMessages - .ERROR_COMPONENT_NAME_INVALID, maxLen, compName), e.getMessage()); - } - - // does not fail if dns is disabled - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DEFAULT_DNS); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - - compName = LEN_64_STR.substring(0, maxLen); - app = createValidApplication(null); - app.addComponent(createValidComponent(compName)); - - // does not fail - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - } - - @Test - public void testPlacementPolicy() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - Service app = createValidApplication("comp-a"); - Component comp = app.getComponents().get(0); - PlacementPolicy pp = new PlacementPolicy(); - PlacementConstraint pc = new PlacementConstraint(); - pc.setName("CA1"); - pp.setConstraints(Collections.singletonList(pc)); - comp.setPlacementPolicy(pp); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "constraint with no type"); - } catch (IllegalArgumentException e) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL, - "CA1 ", "comp-a"), e.getMessage()); - } - - // Set the type - pc.setType(PlacementType.ANTI_AFFINITY); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "constraint with no scope"); - } catch (IllegalArgumentException e) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL, - "CA1 ", "comp-a"), e.getMessage()); - } - - // Set the scope - pc.setScope(PlacementScope.NODE); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)"); - } catch (IllegalArgumentException e) { - assertEquals(String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL, - "CA1 ", "comp-a"), e.getMessage()); - } - - // Set a target tag - but an invalid one - pc.setTargetTags(Collections.singletonList("comp-invalid")); - - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name"); - } catch (IllegalArgumentException e) { - assertEquals( - String.format( - RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME, - "comp-invalid", "comp-a", "comp-a", "comp-a"), - e.getMessage()); - } - - // Set valid target tags now - pc.setTargetTags(Collections.singletonList("comp-a")); - - // Finally it should succeed - try { - ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - } - - @Test - public void testKerberosPrincipal() throws IOException { - SliderFileSystem sfs = ServiceTestUtils.initMockFs(); - Service app = createValidApplication("comp-a"); - KerberosPrincipal kp = new KerberosPrincipal(); - kp.setKeytab("/some/path"); - kp.setPrincipalName("user/_h...@domain.com"); - app.setKerberosPrincipal(kp); - - try { - ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab URI scheme"); - } catch (IllegalArgumentException e) { - assertEquals( - String.format(RestApiErrorMessages.ERROR_KEYTAB_URI_SCHEME_INVALID, - kp.getKeytab()), - e.getMessage()); - } - - kp.setKeytab("/ blank / in / paths"); - try { - ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - Assert.fail(EXCEPTION_PREFIX + "service with invalid keytab"); - } catch (IllegalArgumentException e) { - // strip out the %s at the end of the RestApiErrorMessages string constant - assertTrue(e.getMessage().contains( - RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.substring(0, - RestApiErrorMessages.ERROR_KEYTAB_URI_INVALID.length() - 2))); - } - - kp.setKeytab("file:///tmp/a.keytab"); - // now it should succeed - try { - ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - } - - @Test - public void testKerberosPrincipalNameFormat() throws IOException { - Service app = createValidApplication("comp-a"); - KerberosPrincipal kp = new KerberosPrincipal(); - kp.setPrincipalName("u...@domain.com"); - app.setKerberosPrincipal(kp); - - try { - ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name format."); - } catch (IllegalArgumentException e) { - assertEquals( - String.format(RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT, - kp.getPrincipalName()), - e.getMessage()); - } - - kp.setPrincipalName("user/_h...@domain.com"); - try { - ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal()); - } catch (IllegalArgumentException e) { - Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index fc509f1..a37cabe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -19,23 +19,26 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.exceptions.SliderException; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; -import java.util.Map; - -import static org.mockito.Mockito.mock; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeoutException; /** * Tests for {@link ServiceManager}. @@ -46,117 +49,120 @@ public class TestServiceManager { public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); - @Test - public void testUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testUpgrade"); - upgrade(serviceManager, "v2", false, false); + @Test (timeout = TIMEOUT) + public void testUpgrade() throws Exception { + ServiceContext context = createServiceContext("testUpgrade"); + initUpgrade(context, "v2", false, false, false); Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + context.getServiceManager().getServiceSpec().getState()); } - @Test + @Test (timeout = TIMEOUT) public void testRestartNothingToUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( + throws Exception { + ServiceContext context = createServiceContext( "testRestartNothingToUpgrade"); - upgrade(serviceManager, "v2", false, false); - - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + initUpgrade(context, "v2", false, false, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeAllInstances(context); + + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service not re-started", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test - public void testAutoFinalizeNothingToUpgrade() throws IOException, - SliderException { - ServiceManager serviceManager = createTestServiceManager( + @Test(timeout = TIMEOUT) + public void testAutoFinalizeNothingToUpgrade() throws Exception { + ServiceContext context = createServiceContext( "testAutoFinalizeNothingToUpgrade"); - upgrade(serviceManager, "v2", false, true); - - //make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE)); + initUpgrade(context, "v2", false, true, false); + ServiceManager manager = context.getServiceManager(); + //make components stable by upgrading all instances + upgradeAllInstances(context); + + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service stable", ServiceState.STABLE, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); } - @Test + @Test(timeout = TIMEOUT) public void testRestartWithPendingUpgrade() - throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager("testRestart"); - upgrade(serviceManager, "v2", true, false); - serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + throws Exception { + ServiceContext context = createServiceContext("testRestart"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + context.scheduler.getDispatcher().stop(); Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + ServiceState.UPGRADING, manager.getServiceSpec().getState()); } - @Test - public void testCheckState() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - upgrade(serviceManager, "v2", true, false); + @Test(timeout = TIMEOUT) + public void testFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckState"); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, - serviceManager.getServiceSpec().getState()); + manager.getServiceSpec().getState()); - // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> { - comp.setState(ComponentState.STABLE); - }); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); - Assert.assertEquals("service should still be upgrading", - ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + //make components stable by upgrading all instances + upgradeAllInstances(context); // finalize service - ServiceEvent restart = new ServiceEvent(ServiceEventType.START); - serviceManager.handle(restart); - Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.START)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + manager.getServiceSpec().getState()); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + validateUpgradeFinalization(manager.getName(), "v2"); } - @Test - public void testCheckStateAutoFinalize() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testCheckState"); - serviceManager.getServiceSpec().setState( + @Test(timeout = TIMEOUT) + public void testAutoFinalize() throws Exception { + ServiceContext context = createServiceContext("testCheckStateAutoFinalize"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); - upgrade(serviceManager, "v2", true, true); - Assert.assertEquals("service not upgrading", - ServiceState.UPGRADING_AUTO_FINALIZE, - serviceManager.getServiceSpec().getState()); + initUpgrade(context, "v2", true, true, false); // make components stable - serviceManager.getServiceSpec().getComponents().forEach(comp -> - comp.setState(ComponentState.STABLE)); - ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE); - serviceManager.handle(checkStable); + upgradeAllInstances(context); + + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); Assert.assertEquals("service not stable", - ServiceState.STABLE, serviceManager.getServiceSpec().getState()); + ServiceState.STABLE, manager.getServiceSpec().getState()); - validateUpgradeFinalization(serviceManager.getName(), "v2"); + validateUpgradeFinalization(manager.getName(), "v2"); } @Test - public void testInvalidUpgrade() throws IOException, SliderException { - ServiceManager serviceManager = createTestServiceManager( - "testInvalidUpgrade"); - serviceManager.getServiceSpec().setState( + public void testInvalidUpgrade() throws Exception { + ServiceContext serviceContext = createServiceContext("testInvalidUpgrade"); + ServiceManager manager = serviceContext.getServiceManager(); + manager.getServiceSpec().setState( ServiceState.UPGRADING_AUTO_FINALIZE); Service upgradedDef = ServiceTestUtils.createExampleApplication(); - upgradedDef.setName(serviceManager.getName()); + upgradedDef.setName(manager.getName()); upgradedDef.setVersion("v2"); upgradedDef.setLifetime(2L); writeUpgradedDef(upgradedDef); try { - serviceManager.processUpgradeRequest("v2", true); + manager.processUpgradeRequest("v2", true, false); } catch (Exception ex) { Assert.assertTrue(ex instanceof UnsupportedOperationException); return; @@ -164,6 +170,32 @@ public class TestServiceManager { Assert.fail(); } + @Test(timeout = TIMEOUT) + public void testExpressUpgrade() throws Exception { + ServiceContext context = createServiceContext("testExpressUpgrade"); + ServiceManager manager = context.getServiceManager(); + manager.getServiceSpec().setState( + ServiceState.EXPRESS_UPGRADING); + initUpgrade(context, "v2", true, true, true); + + List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service); + // wait till instances of first component are in upgrade + String comp1 = comps.get(0); + upgradeInstancesOf(context, comp1); + + // wait till instances of second component are in upgrade + String comp2 = comps.get(1); + upgradeInstancesOf(context, comp2); + + GenericTestUtils.waitFor(() -> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + + Assert.assertEquals("service not stable", + ServiceState.STABLE, manager.getServiceSpec().getState()); + validateUpgradeFinalization(manager.getName(), "v2"); + } + private void validateUpgradeFinalization(String serviceName, String expectedVersion) throws IOException { Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); @@ -172,15 +204,16 @@ public class TestServiceManager { Assert.assertNotNull("app id not present", savedSpec.getId()); Assert.assertEquals("state not stable", ServiceState.STABLE, savedSpec.getState()); - savedSpec.getComponents().forEach(compSpec -> { - Assert.assertEquals("comp not stable", ComponentState.STABLE, - compSpec.getState()); - }); + savedSpec.getComponents().forEach(compSpec -> + Assert.assertEquals("comp not stable", ComponentState.STABLE, + compSpec.getState())); } - private void upgrade(ServiceManager serviceManager, String version, - boolean upgradeArtifact, boolean autoFinalize) - throws IOException, SliderException { + private void initUpgrade(ServiceContext context, String version, + boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade) + throws IOException, SliderException, TimeoutException, + InterruptedException { + ServiceManager serviceManager = context.getServiceManager(); Service upgradedDef = ServiceTestUtils.createExampleApplication(); upgradedDef.setName(serviceManager.getName()); upgradedDef.setVersion(version); @@ -191,39 +224,81 @@ public class TestServiceManager { }); } writeUpgradedDef(upgradedDef); - serviceManager.processUpgradeRequest(version, autoFinalize); + serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade); ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion(version); - if (autoFinalize) { - upgradeEvent.setAutoFinalize(true); - } - serviceManager.handle(upgradeEvent); + upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade) + .setAutoFinalize(autoFinalize); + + GenericTestUtils.waitFor(()-> { + ServiceState serviceState = context.service.getState(); + if (serviceState.equals(ServiceState.UPGRADING) || + serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || + serviceState.equals(ServiceState.EXPRESS_UPGRADING)) { + return true; + } + return false; + }, CHECK_EVERY_MILLIS, TIMEOUT); + } + + private void upgradeAllInstances(ServiceContext context) throws + TimeoutException, InterruptedException { + // upgrade the instances + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + + // become ready + context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { + ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, + ComponentInstanceEventType.BECOME_READY); + + context.scheduler.getDispatcher().getEventHandler().handle(event); + })); + GenericTestUtils.waitFor(()-> { + for (ComponentInstance instance: + context.scheduler.getLiveInstances().values()) { + if (!instance.getContainerState().equals(ContainerState.READY)) { + return false; + } + } + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); } - private ServiceManager createTestServiceManager(String name) - throws IOException { - ServiceContext context = new ServiceContext(); - context.service = createBaseDef(name); - context.fs = rule.getFs(); - - context.scheduler = new ServiceScheduler(context) { - @Override - protected YarnRegistryViewForProviders createYarnRegistryOperations( - ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); + private void upgradeInstancesOf(ServiceContext context, String compName) + throws TimeoutException, InterruptedException { + Collection<ComponentInstance> compInstances = context.scheduler + .getAllComponents().get(compName).getAllComponentInstances(); + GenericTestUtils.waitFor(() -> { + for (ComponentInstance instance : compInstances) { + if (!instance.getContainerState().equals(ContainerState.UPGRADING)) { + return false; + } } - }; + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); - context.scheduler.init(rule.getConf()); + // instances of comp1 get upgraded and become ready event is triggered + // become ready + compInstances.forEach(instance -> { + ComponentInstanceEvent event = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY); - Map<String, org.apache.hadoop.yarn.service.component.Component> - componentState = context.scheduler.getAllComponents(); - context.service.getComponents().forEach(component -> { - componentState.put(component.getName(), - new org.apache.hadoop.yarn.service.component.Component(component, - 1L, context)); + context.scheduler.getDispatcher().getEventHandler().handle(event); }); - return new ServiceManager(context); + } + + private ServiceContext createServiceContext(String name) + throws Exception { + Service service = createBaseDef(name); + ServiceContext context = new MockRunningServiceContext(rule, + service); + context.scheduler.getDispatcher().setDrainEventsOnStop(); + context.scheduler.getDispatcher().start(); + return context; } public static Service createBaseDef(String name) { @@ -257,4 +332,6 @@ public class TestServiceManager { upgradedDef); } + private static final int TIMEOUT = 200000; + private static final int CHECK_EVERY_MILLIS = 100; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e557c6bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 8b13b24..216d88f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -415,6 +415,41 @@ public class TestYarnNativeServices extends ServiceTestUtils { client.actionDestroy(service.getName()); } + @Test(timeout = 200000) + public void testExpressUpgrade() throws Exception { + setupInternal(NUM_NMS); + getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + client.actionCreate(service); + waitForServiceToBeStable(client, service); + + // upgrade the service + Component component = service.getComponents().iterator().next(); + service.setState(ServiceState.EXPRESS_UPGRADING); + service.setVersion("v2"); + component.getConfiguration().getEnv().put("key1", "val1"); + Component component2 = service.getComponent("compb"); + component2.getConfiguration().getEnv().put("key2", "val2"); + client.actionUpgradeExpress(service); + + // wait for upgrade to complete + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("compa does not have new env", "val1", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + Assert.assertEquals("compb does not have new env", "val2", + active.getComponent(component2.getName()).getConfiguration() + .getEnv("key2")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); + } + // Test to verify ANTI_AFFINITY placement policy // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org