SLIDER-149 complete client switch to new registry code ... old code stil present but unused. Tests working
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/a0528889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/a0528889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/a0528889 Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: a0528889fc47977794efdc2354597c7289e382b8 Parents: be1174c Author: Steve Loughran <ste...@apache.org> Authored: Wed Aug 27 20:57:31 2014 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Wed Aug 27 20:57:31 2014 +0100 ---------------------------------------------------------------------- .../org/apache/slider/client/SliderClient.java | 172 ++++++++- .../registry/info/CommonRegistryConstants.java | 25 -- .../registry/info/CustomRegistryConstants.java | 7 + .../registry/retrieve/RegistryRetriever.java | 62 +-- .../slideram/SliderAMProviderService.java | 17 +- .../server/appmaster/SliderAppMaster.java | 37 +- .../utility/AbstractSliderLaunchedService.java | 15 +- .../YarnRegistryViewForProviders.java | 7 +- .../standalone/TestStandaloneRegistryAM.groovy | 367 ------------------ .../agent/standalone/TestYarnRegistryAM.groovy | 376 +++++++++++++++++++ .../apache/slider/test/MicroZKCluster.groovy | 29 +- .../apache/slider/test/SliderTestUtils.groovy | 10 +- .../slider/test/YarnMiniClusterTestBase.groovy | 7 +- .../test/YarnZKMiniClusterTestBase.groovy | 21 +- 14 files changed, 671 insertions(+), 481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index ebf2133..0f7fe5a 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -20,8 +20,10 @@ package org.apache.slider.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -38,8 +40,13 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.registry.client.api.RegistryConstants; +import org.apache.hadoop.yarn.registry.client.binding.BindingUtils; +import org.apache.hadoop.yarn.registry.client.binding.RecordOperations; import org.apache.hadoop.yarn.registry.client.binding.ZKPathDumper; import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService; +import org.apache.hadoop.yarn.registry.client.types.Endpoint; +import org.apache.hadoop.yarn.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.yarn.registry.client.types.ServiceRecord; import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterNode; import org.apache.slider.api.InternalKeys; @@ -2290,14 +2297,14 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe registryArgs.validate(); try { if (registryArgs.list) { - actionRegistryList(registryArgs); + actionRegistryListYarn(registryArgs); } else if (registryArgs.listConf) { // list the configurations - actionRegistryListConfigs(registryArgs); + actionRegistryListConfigsYarn(registryArgs); } else if (SliderUtils.isSet(registryArgs.getConf)) { // get a configuration PublishedConfiguration publishedConfiguration = - actionRegistryGetConfig(registryArgs); + actionRegistryGetConfigYarn(registryArgs); outputConfig(publishedConfiguration, registryArgs); } else { // it's an unknown command @@ -2305,9 +2312,14 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe "Bad command arguments for " + ACTION_REGISTRY + " " + registryArgs); } +// JDK7 } catch (FileNotFoundException e) { log.info("{}", e.toString()); - log.debug("{}",e, e); + log.debug("{}", e, e); + return EXIT_NOT_FOUND; + } catch (PathNotFoundException e) { + log.info("{}", e.toString()); + log.debug("{}", e, e); return EXIT_NOT_FOUND; } return EXIT_SUCCESS; @@ -2322,6 +2334,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe * @throws IOException Network or other problems */ @VisibleForTesting + @Deprecated public List<ServiceInstanceData> actionRegistryList( ActionRegistryArgs registryArgs) throws YarnException, IOException { @@ -2345,6 +2358,72 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return sids; } + /** + * Registry operation + * + * @param registryArgs registry Arguments + * @return the instances (for tests) + * @throws YarnException YARN problems + * @throws IOException Network or other problems + */ + @VisibleForTesting + public List<ServiceRecord> actionRegistryListYarn( + ActionRegistryArgs registryArgs) + throws YarnException, IOException { + String serviceType = registryArgs.serviceType; + String name = registryArgs.name; + RegistryOperationsService operations = getRegistryOperations(); + List<ServiceRecord> serviceRecords; + if (StringUtils.isEmpty(name)) { + String serviceclassPath = + BindingUtils.serviceclassPath(BindingUtils.currentUser(), + serviceType); + RegistryPathStatus[] listDir; + listDir = operations.listDir(serviceclassPath); + if (listDir.length == 0) { + throw new PathNotFoundException("records under " + + serviceclassPath); + } + serviceRecords = + RecordOperations.extractServiceRecords(operations, listDir); + } else { + ServiceRecord instance = lookupServiceRecord(registryArgs); + serviceRecords = new ArrayList<ServiceRecord>(1); + serviceRecords.add(instance); + } + + for (ServiceRecord serviceRecord : serviceRecords) { + logInstance(serviceRecord, registryArgs.verbose); + } + return serviceRecords; + } + + /** + * Log a service record instance + * @param instance record + * @param verbose verbose logging of all external endpoints + */ + private void logInstance(ServiceRecord instance, + boolean verbose) { + if (!verbose) { + log.info("{}", instance.id); + } else { + log.info("{}: ", instance.id); + logEndpoints(instance); + } + } + + /** + * Log the external endpoints of a service record + * @param instance service record instance + */ + private void logEndpoints(ServiceRecord instance) { + List<Endpoint> endpoints = instance.external; + for (Endpoint endpoint : endpoints) { + log.info(endpoint.toString()); + } + } + private void logInstance(ServiceInstanceData instance, boolean verbose) { if (!verbose) { @@ -2392,6 +2471,34 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } } } + /** + * list configs available for an instance + * + * @param registryArgs registry Arguments + * @throws YarnException YARN problems + * @throws IOException Network or other problems + */ + public void actionRegistryListConfigsYarn(ActionRegistryArgs registryArgs) + throws YarnException, IOException { + + ServiceRecord instance = lookupServiceRecord(registryArgs); + + RegistryRetriever retriever = new RegistryRetriever(instance); + PublishedConfigSet configurations = + retriever.getConfigurations(!registryArgs.internal); + + for (String configName : configurations.keys()) { + if (!registryArgs.verbose) { + log.info("{}", configName); + } else { + PublishedConfiguration published = + configurations.get(configName); + log.info("{} : {}", + configName, + published.description); + } + } + } /** * list configs available for an instance @@ -2402,6 +2509,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe * @throws FileNotFoundException if the config is not found */ @VisibleForTesting + @Deprecated public PublishedConfiguration actionRegistryGetConfig(ActionRegistryArgs registryArgs) throws YarnException, IOException { ServiceInstanceData instance = lookupInstance(registryArgs); @@ -2418,6 +2526,30 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } /** + * list configs available for an instance + * + * @param registryArgs registry Arguments + * @throws YarnException YARN problems + * @throws IOException Network or other problems + * @throws FileNotFoundException if the config is not found + */ + @VisibleForTesting + public PublishedConfiguration actionRegistryGetConfigYarn(ActionRegistryArgs registryArgs) + throws YarnException, IOException { + ServiceRecord instance = lookupServiceRecord(registryArgs); + + RegistryRetriever retriever = new RegistryRetriever(instance); + boolean external = !registryArgs.internal; + PublishedConfigSet configurations = + retriever.getConfigurations(external); + + PublishedConfiguration published = retriever.retrieveConfiguration(configurations, + registryArgs.getConf, + external); + return published; + } + + /** * write out the config * @param published * @param registryArgs @@ -2495,8 +2627,40 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } catch (Exception e) { throw new IOException(e); } + } + + + /** + * Look up an instance + * @return instance data + * @throws SliderException other failures + * @throws IOException IO problems or wrapped exceptions + */ + private ServiceRecord lookupServiceRecord(ActionRegistryArgs registryArgs) throws + SliderException, + IOException { + return lookupServiceRecord(registryArgs.name, registryArgs.serviceType); + } + + /** + * Look up an instance + * @param id instance ID + * @param serviceType service type + * @return instance data + * @throws UnknownApplicationInstanceException no match + * @throws SliderException other failures + * @throws IOException IO problems or wrapped exceptions + */ + private ServiceRecord lookupServiceRecord(String id, + String serviceType) throws + IOException, SliderException { + return getRegistryOperations().resolve( + BindingUtils.servicePath(BindingUtils.currentUser(), + serviceType, id)); } + + /** * List instances in the registry * @return http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/core/registry/info/CommonRegistryConstants.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/info/CommonRegistryConstants.java b/slider-core/src/main/java/org/apache/slider/core/registry/info/CommonRegistryConstants.java deleted file mode 100644 index a286ba4..0000000 --- a/slider-core/src/main/java/org/apache/slider/core/registry/info/CommonRegistryConstants.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.core.registry.info; - -public class CommonRegistryConstants { - - public static final String WEB_UI = "org.apache.http.UI"; - -} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java index 5ce5f73..d2658c1 100644 --- a/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java +++ b/slider-core/src/main/java/org/apache/slider/core/registry/info/CustomRegistryConstants.java @@ -31,6 +31,12 @@ public class CustomRegistryConstants { public static final String PUBLISHER_REST_API = "org.apache.slider.publisher"; + public static final String PUBLISHER_CONFIGURATIONS_API = + "org.apache.slider.publisher.configurations"; + + public static final String PUBLISHER_DOCUMENTS_API = + "org.apache.slider.publisher.documents"; + public static final String AGENT_SECURE_REST_API = "org.apache.slider.agents.secure"; @@ -40,4 +46,5 @@ public class CustomRegistryConstants { public static final String AM_IPC_PROTOCOL = "org.apache.slider.appmaster"; + public static final String WEB_UI = "org.apache.http.UI"; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java index 4fe2e8f..001bf66 100644 --- a/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java +++ b/slider-core/src/main/java/org/apache/slider/core/registry/retrieve/RegistryRetriever.java @@ -25,11 +25,15 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.json.JSONConfiguration; +import org.apache.hadoop.yarn.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.yarn.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.yarn.registry.client.types.Endpoint; +import org.apache.hadoop.yarn.registry.client.types.ServiceRecord; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.exceptions.ExceptionConverter; import org.apache.slider.core.registry.docstore.PublishedConfigSet; import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.info.RegistryView; +import org.apache.slider.core.registry.info.CustomRegistryConstants; import org.apache.slider.core.registry.info.ServiceInstanceData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +50,8 @@ import java.io.IOException; public class RegistryRetriever { private static final Logger log = LoggerFactory.getLogger(RegistryRetriever.class); - private final ServiceInstanceData instance; + private final String externalConfigurationURL; + private final String internalConfigurationURL; private static final Client jerseyClient; static { @@ -60,30 +65,38 @@ public class RegistryRetriever { public RegistryRetriever(ServiceInstanceData instance) { - this.instance = instance; + externalConfigurationURL = instance.externalView.configurationsURL; + internalConfigurationURL = instance.internalView.configurationsURL; } - /** - * Get the appropriate view for the flag - * @param external - * @return - */ - private RegistryView getRegistryView(boolean external) { - return external ? instance.externalView : instance.internalView; + + public RegistryRetriever(String externalConfigurationURL, String internalConfigurationURL) { + this.externalConfigurationURL = externalConfigurationURL; + this.internalConfigurationURL = internalConfigurationURL; } - private String destination(boolean external) { - return external ? "external" : "internal"; + public RegistryRetriever(ServiceRecord record) throws InvalidRecordException { + Endpoint internal = record.getInternalEndpoint( + CustomRegistryConstants.PUBLISHER_CONFIGURATIONS_API); + + internalConfigurationURL = RegistryTypeUtils.retrieveAddressUriType( + internal); + Endpoint external = record.getExternalEndpoint( + CustomRegistryConstants.PUBLISHER_CONFIGURATIONS_API); + + externalConfigurationURL = RegistryTypeUtils.retrieveAddressUriType( + external); } + /** * Does a bonded registry retriever have a configuration? * @param external flag to indicate that it is the external entries to fetch * @return true if there is a URL to the configurations defined */ public boolean hasConfigurations(boolean external) { - String confURL = getRegistryView(external).configurationsURL; - return !Strings.isStringEmpty(confURL); + return !Strings.isStringEmpty( + external ? externalConfigurationURL : internalConfigurationURL); } /** @@ -94,11 +107,7 @@ public class RegistryRetriever { public PublishedConfigSet getConfigurations(boolean external) throws FileNotFoundException, IOException { - String confURL = getRegistryView(external).configurationsURL; - if (Strings.isStringEmpty(confURL)) { - throw new FileNotFoundException("No configuration URL at " - + destination(external) + " view"); - } + String confURL = getConfigurationURL(external); try { WebResource webResource = jsonResource(confURL); log.debug("GET {}", confURL); @@ -109,6 +118,14 @@ public class RegistryRetriever { } } + protected String getConfigurationURL(boolean external) throws FileNotFoundException { + String confURL = external ? externalConfigurationURL: internalConfigurationURL; + if (Strings.isStringEmpty(confURL)) { + throw new FileNotFoundException("No configuration URL"); + } + return confURL; + } + private WebResource resource(String url) { WebResource resource = jerseyClient.resource(url); return resource; @@ -122,7 +139,7 @@ public class RegistryRetriever { /** * Get a complete configuration, with all values - * @param configSet + * @param configSet config set to ask for * @param name name of the configuration * @param external flag to indicate that it is an external configuration * @return the retrieved config @@ -131,10 +148,10 @@ public class RegistryRetriever { public PublishedConfiguration retrieveConfiguration(PublishedConfigSet configSet, String name, boolean external) throws IOException { + String confURL = getConfigurationURL(external); if (!configSet.contains(name)) { throw new FileNotFoundException("Unknown configuration " + name); } - String confURL = getRegistryView(external).configurationsURL; confURL = SliderUtils.appendToURL(confURL, name); try { WebResource webResource = jsonResource(confURL); @@ -148,7 +165,8 @@ public class RegistryRetriever { @Override public String toString() { - return super.toString() + " - " + instance; + return super.toString() + + " - external " + externalConfigurationURL; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java index 2b62d24..f01f995 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java @@ -35,7 +35,6 @@ import org.apache.slider.core.exceptions.BadCommandArgumentsException; import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.launch.ContainerLauncher; import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.info.CommonRegistryConstants; import org.apache.slider.core.registry.info.CustomRegistryConstants; import org.apache.slider.core.registry.info.RegisteredEndpoint; import org.apache.slider.core.registry.info.RegistryView; @@ -50,6 +49,7 @@ import org.apache.slider.server.appmaster.web.rest.RestPaths; import java.io.File; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -161,7 +161,7 @@ public class SliderAMProviderService extends AbstractProviderService implements RestPaths.REGISTRY_SERVICE); RegistryView externalView = instanceData.externalView; - externalView.endpoints.put(CommonRegistryConstants.WEB_UI, webUI); + externalView.endpoints.put(CustomRegistryConstants.WEB_UI, webUI); externalView.endpoints.put( CustomRegistryConstants.MANAGEMENT_REST_API, @@ -184,12 +184,13 @@ public class SliderAMProviderService extends AbstractProviderService implements // Set the configurations URL. - externalView.configurationsURL = SliderUtils.appendToURL( + String configurationsURL = SliderUtils.appendToURL( publisherURL.toExternalForm(), RestPaths.SLIDER_CONFIGSET); + externalView.configurationsURL = configurationsURL; serviceRecord.addExternalEndpoint( RegistryTypeUtils.webEndpoint( - CommonRegistryConstants.WEB_UI, amWebURI.toURI())); + CustomRegistryConstants.WEB_UI, amWebURI.toURI())); serviceRecord.addExternalEndpoint( RegistryTypeUtils.restEndpoint( CustomRegistryConstants.MANAGEMENT_REST_API, @@ -198,6 +199,14 @@ public class SliderAMProviderService extends AbstractProviderService implements RegistryTypeUtils.restEndpoint( CustomRegistryConstants.PUBLISHER_REST_API, publisherURL.toURI())); + serviceRecord.addExternalEndpoint( + RegistryTypeUtils.restEndpoint( + CustomRegistryConstants.REGISTRY_REST_API, + registryREST.toURI())); + serviceRecord.addExternalEndpoint( + RegistryTypeUtils.restEndpoint( + CustomRegistryConstants.PUBLISHER_CONFIGURATIONS_API, + new URI(configurationsURL))); } catch (URISyntaxException e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 0c19036..c569438 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -357,7 +357,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private SliderAMWebApp webApp; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private InetSocketAddress rpcServiceAddress; - private ProviderService sliderAMProvider; + private SliderAMProviderService sliderAMProvider; private CertificateManager certificateManager; private WorkflowExecutorService<ExecutorService> executorService; @@ -365,7 +365,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private final QueueService actionQueues = new QueueService(); private String agentOpsUrl; private String agentStatusUrl; - private YarnRegistryViewForProviders yarnRegistryView; + private YarnRegistryViewForProviders yarnRegistryOperations; /** * Service Constructor @@ -896,12 +896,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //Give the provider restricted access to the state, registry setupInitialRegistryPaths(); - yarnRegistryView = new YarnRegistryViewForProviders( + yarnRegistryOperations = new YarnRegistryViewForProviders( registryOperations, service_user_name, SliderKeys.APP_TYPE, instanceName); - providerService.bindToYarnRegistry(yarnRegistryView); - sliderAMProvider.bindToYarnRegistry(yarnRegistryView); + providerService.bindToYarnRegistry(yarnRegistryOperations); + sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations); List<String> serviceInstancesRunning = registry.instanceIDs(serviceName); log.info("service instances already running: {}", serviceInstancesRunning); @@ -921,6 +921,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // Yarn registry ServiceRecord serviceRecord = new ServiceRecord(); + String serviceID = appid.toString(); + serviceRecord.id = serviceID; + serviceRecord.description = "Slider Application Master"; serviceRecord.addExternalEndpoint( RegistryTypeUtils.ipcEndpoint( @@ -949,29 +952,35 @@ public class SliderAppMaster extends AbstractSliderLaunchedService registry.registerSelf( instanceData, amWebURI ); - yarnRegistryView.putService(service_user_name, + + log.info("Service Record \n{}", serviceRecord); + yarnRegistryOperations.putService(service_user_name, SliderKeys.APP_TYPE, instanceName, serviceRecord); // and an ephemeral binding to the app - yarnRegistryView.putComponent( - "appmaster", + yarnRegistryOperations.putComponent( + RegistryTypeUtils.yarnIdToDnsId(appAttemptID.toString()), serviceRecord, - true); + false); } +/* @Override protected RegistryOperationsService createRegistryOperationsInstance() { return new ResourceManagerRegistryService("YarnRegistry"); } +*/ protected void setupInitialRegistryPaths() throws IOException { - ResourceManagerRegistryService rmRegOperations = - (ResourceManagerRegistryService) registryOperations; - rmRegOperations.createUserPath(service_user_name); + if (registryOperations instanceof ResourceManagerRegistryService) { + ResourceManagerRegistryService rmRegOperations = + (ResourceManagerRegistryService) registryOperations; + rmRegOperations.createUserPath(service_user_name); + } } /** @@ -993,7 +1002,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService cid, description); try { - yarnRegistryView.putComponent(cid, container, true); + yarnRegistryOperations.putComponent(cid, container, true); } catch (IOException e) { log.warn("Failed to register container {}/{}: {}", id, description, e, e); @@ -1013,7 +1022,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService log.info("Unregistering component {}", id); String cid = RegistryTypeUtils.yarnIdToDnsId(id.toString()); try { - yarnRegistryView.rmComponent(cid); + yarnRegistryOperations.rmComponent(cid); } catch (IOException e) { log.warn("Failed to delete container {} : {}", id, e, e); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java index 3dafcd2..74718c9 100644 --- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java +++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java @@ -18,7 +18,6 @@ package org.apache.slider.server.services.utility; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.registry.client.api.RegistryConstants; import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService; @@ -46,13 +45,6 @@ public abstract class AbstractSliderLaunchedService extends new YarnConfiguration(); } - @Override - protected void serviceInit(Configuration conf) throws Exception { - String quorum = lookupZKQuorum(); - conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM, quorum); - super.serviceInit(conf); - } - /** * Start the registration service * @return the instance @@ -113,10 +105,9 @@ public abstract class AbstractSliderLaunchedService extends } /** - * Start the YARN registration service - * @param zkConnection - * @param zkPath - * @return + * Create, adopt ,and start the YARN registration service + * @return the registry operations service, already deployed as a child + * of the AbstractSliderLaunchedService instance. */ public RegistryOperationsService startRegistryOperationsService() throws BadConfigException { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java b/slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java index 69802e8..ef9c3a4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java +++ b/slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java @@ -19,7 +19,7 @@ package org.apache.slider.server.services.yarnregistry; import org.apache.hadoop.yarn.registry.client.binding.BindingUtils; -import org.apache.hadoop.yarn.registry.client.binding.RegistryZKUtils; +import org.apache.hadoop.yarn.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService; import org.apache.hadoop.yarn.registry.client.types.CreateFlags; import org.apache.hadoop.yarn.registry.client.types.ServiceRecord; @@ -90,7 +90,7 @@ public class YarnRegistryViewForProviders { boolean ephemeral) throws IOException { String path = BindingUtils.componentPath( user, serviceClass, serviceName, componentName); - registryOperations.mkdir(RegistryZKUtils.parentOf(path), true); + registryOperations.mkdir(RegistryPathUtils.parentOf(path), true); registryOperations.create(path, record, CreateFlags.OVERWRITE + (ephemeral ? CreateFlags.EPHEMERAL : 0)); @@ -109,9 +109,10 @@ public class YarnRegistryViewForProviders { String serviceName, ServiceRecord record) throws IOException { + String path = BindingUtils.servicePath( username, serviceClass, serviceName); - registryOperations.mkdir(RegistryZKUtils.parentOf(path), true); + registryOperations.mkdir(RegistryPathUtils.parentOf(path), true); registryOperations.create(path, record, CreateFlags.OVERWRITE); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneRegistryAM.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneRegistryAM.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneRegistryAM.groovy deleted file mode 100644 index 1bc4b97..0000000 --- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneRegistryAM.groovy +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.agent.standalone - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -import org.apache.hadoop.yarn.api.records.ApplicationReport -import org.apache.hadoop.yarn.api.records.YarnApplicationState -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.slider.agent.AgentMiniClusterTestBase -import org.apache.slider.api.ClusterNode -import org.apache.slider.client.SliderClient -import org.apache.slider.common.SliderExitCodes -import org.apache.slider.common.SliderKeys -import org.apache.slider.common.params.ActionRegistryArgs -import org.apache.slider.core.main.ServiceLauncher -import org.apache.slider.core.persist.JsonSerDeser -import org.apache.slider.core.registry.docstore.PublishedConfigSet -import org.apache.slider.core.registry.docstore.PublishedConfiguration -import org.apache.slider.core.registry.docstore.UriMap -import org.apache.slider.core.registry.info.CustomRegistryConstants -import org.apache.slider.core.registry.info.ServiceInstanceData -import org.apache.slider.core.registry.retrieve.RegistryRetriever -import org.apache.slider.server.appmaster.PublishedArtifacts -import org.apache.slider.server.appmaster.web.rest.RestPaths -import org.apache.slider.server.services.curator.CuratorServiceInstance -import org.apache.slider.server.services.registry.SliderRegistryService -import org.junit.Test - -/** - * create masterless AMs and work with them. This is faster than - * bringing up full clusters - */ -@CompileStatic -@Slf4j - -class TestStandaloneRegistryAM extends AgentMiniClusterTestBase { - - - public static final String ARTIFACT_NAME = PublishedArtifacts.COMPLETE_CONFIG - - @Test - public void testRegistryAM() throws Throwable { - - - describe "create a masterless AM then perform registry operations on it" - - //launch fake master - String clustername = createMiniCluster(configuration, 1, true) - ServiceLauncher<SliderClient> launcher - launcher = createStandaloneAM(clustername, true, false) - SliderClient client = launcher.service - addToTeardown(client); - - ApplicationReport report = waitForClusterLive(client) - logReport(report) - List<ApplicationReport> apps = client.applications; - - List<ClusterNode> clusterNodes = client.listClusterNodesInRole( - SliderKeys.COMPONENT_AM) - assert ((List<ClusterNode>)clusterNodes).size() == 1 - - ClusterNode masterNode = clusterNodes[0] - log.info("Master node = ${masterNode}"); - - List<ClusterNode> nodes - String[] uuids = client.listNodeUUIDsByRole(SliderKeys.COMPONENT_AM) - assert uuids.length == 1; - nodes = client.listClusterNodes(uuids); - assert ((List<ClusterNode>)nodes).size() == 1; - describe "AM Node UUID=${uuids[0]}" - - nodes = listNodesInRole(client, SliderKeys.COMPONENT_AM) - assert ((List<ClusterNode>)nodes).size() == 1; - nodes = listNodesInRole(client, "") - assert ((List<ClusterNode>)nodes).size() == 1; - ClusterNode master = nodes[0] - assert master.role == SliderKeys.COMPONENT_AM - - - - - String username = client.username - def yarnRegistryClient = client.yarnAppListClient - describe("list of all applications") - logApplications(apps) - describe("apps of user $username") - List<ApplicationReport> userInstances = yarnRegistryClient.listInstances() - logApplications(userInstances) - assert userInstances.size() == 1 - describe("named app $clustername") - ApplicationReport instance = yarnRegistryClient.findInstance(clustername) - logReport(instance) - assert instance != null - - //switch to the ZK-based registry - - - log.info("slider service registry: \n${client.dumpSliderRegistry(true)}\n") - log.info("yarn service registry: \n${client.dumpYarnRegistry(true)}\n") - - - describe "service registry names" - SliderRegistryService registryService = client.registry - def serviceTypes = registryService.serviceTypes; - dumpRegistryServiceTypes(serviceTypes) - - List<String> instanceIds = client.listRegisteredSliderInstances() - - - dumpRegistryInstanceIDs(instanceIds) - assert instanceIds.size() == 1 - - List<CuratorServiceInstance<ServiceInstanceData>> instances = - client.listRegistryInstances() - dumpRegistryInstances(instances) - - assert instances.size() == 1 - - def amInstance = instances[0] - def serviceInstanceData = amInstance.payload - - def externalEndpoints = serviceInstanceData.externalView.endpoints - - // hit the registry web page - - def registryEndpoint = externalEndpoints.get( - CustomRegistryConstants.REGISTRY_REST_API) - assert registryEndpoint != null - def registryURL = registryEndpoint.asURL() - describe("Registry WADL @ $registryURL") - - def publisherEndpoint = externalEndpoints.get(CustomRegistryConstants.PUBLISHER_REST_API) - assert publisherEndpoint != null - def publisherURL = publisherEndpoint.asURL() - def publisher = publisherURL.toString() - describe("Publisher") - - JsonSerDeser<UriMap> uriMapDeser = new JsonSerDeser<>(UriMap) - def setlisting = GET(publisherURL) - - log.info(setlisting) - - UriMap uris = uriMapDeser.fromJson(setlisting) - assert uris.uris[RestPaths.SLIDER_CONFIGSET] - def publishedJSON = GET(publisherURL, RestPaths.SLIDER_CONFIGSET) - JsonSerDeser< PublishedConfigSet> serDeser= new JsonSerDeser<>( - PublishedConfigSet) - def configSet = serDeser.fromJson(publishedJSON) - assert configSet.size() >= 1 - assert configSet.contains(ARTIFACT_NAME) - PublishedConfiguration publishedYarnSite = configSet.get(ARTIFACT_NAME) - - assert publishedYarnSite.empty - - //get the full URL - def yarnSitePublisher = appendToURL(publisher, - RestPaths.SLIDER_CONFIGSET, - ARTIFACT_NAME) - - String confJSON = GET(yarnSitePublisher) -// log.info(confJSON) - JsonSerDeser< PublishedConfiguration> confSerDeser = - new JsonSerDeser<PublishedConfiguration>(PublishedConfiguration) - - publishedYarnSite = confSerDeser.fromJson(confJSON) - - assert !publishedYarnSite.empty - - - //get the XML - def yarnSiteXML = yarnSitePublisher + ".xml" - - - String confXML = GET(yarnSiteXML) - log.info("Conf XML at $yarnSiteXML = \n $confXML") - - String properties = GET(yarnSitePublisher + ".properties") - Properties parsedProps = new Properties() - parsedProps.load(new StringReader(properties)) - assert parsedProps.size() > 0 - def rmAddrFromDownloadedProperties = parsedProps.get(YarnConfiguration.RM_ADDRESS) - def rmHostnameFromDownloadedProperties = parsedProps.get(YarnConfiguration.RM_HOSTNAME) - assert rmAddrFromDownloadedProperties - assert rmHostnameFromDownloadedProperties - - String json = GET(yarnSitePublisher + ".json") - - - - describe("Registry List") - log.info(GET(registryURL)) - - - describe "Registry Retrieval Class" - // retrieval - - RegistryRetriever retriever = new RegistryRetriever(serviceInstanceData) - log.info retriever.toString() - - assert retriever.hasConfigurations(true) - PublishedConfigSet externalConfSet = retriever.getConfigurations(true) - dumpConfigurationSet(externalConfSet) - assert externalConfSet[ARTIFACT_NAME] - - - describe "verify SLIDER-52 processing" - def yarnSite = retriever.retrieveConfiguration( - externalConfSet, - ARTIFACT_NAME, - true) - assert !yarnSite.empty - def siteXML = yarnSite.asConfiguration() - def rmHostnameViaClientSideXML = parsedProps.get( - YarnConfiguration.RM_HOSTNAME) - assert rmHostnameViaClientSideXML == rmHostnameFromDownloadedProperties - def rmAddrViaClientSideXML = siteXML.get(YarnConfiguration.RM_ADDRESS) - - log.info("RM from downloaded props = $rmAddrFromDownloadedProperties") - assert rmAddrViaClientSideXML == rmAddrFromDownloadedProperties - - describe "fetch missing artifact" - try { - retriever.retrieveConfiguration(externalConfSet, "no-such-artifact", true) - fail("expected a failure") - } catch (FileNotFoundException expected) { - // expected - } - describe "Internal configurations" - assert !retriever.hasConfigurations(false) - try { - retriever.getConfigurations(false) - fail("expected a failure") - } catch (FileNotFoundException expected) { - // expected - } - - - // retrieval via API - ActionRegistryArgs registryArgs = new ActionRegistryArgs() - registryArgs.verbose = true - - // list all - registryArgs.list = true; - describe registryArgs.toString() - client.actionRegistry(registryArgs) - - // list a named instance and expect a failure - registryArgs.list = true; - registryArgs.name = "unknown" - try { - client.actionRegistryList(registryArgs) - } catch (FileNotFoundException expected) { - // expected - } - - // list all instances of an alternate type and expect failure - registryArgs.list = true; - registryArgs.name = null - registryArgs.serviceType = "org.apache.hadoop" - try { - client.actionRegistryList(registryArgs) - } catch (FileNotFoundException expected) { - // expected - } - - registryArgs.serviceType = "" - try { - client.actionRegistryList(registryArgs) - } catch (FileNotFoundException expected) { - // expected - } - //set the name - registryArgs.name = serviceInstanceData.id; - registryArgs.serviceType = SliderKeys.APP_TYPE - - - //now expect list to work - describe registryArgs.toString() - - def listedInstance = client.actionRegistryList(registryArgs) - assert listedInstance[0].id == serviceInstanceData.id - - - // listconf - registryArgs.list = false; - registryArgs.listConf = true - describe registryArgs.toString() - - client.actionRegistry(registryArgs) - - // listconf --internal - registryArgs.list = false; - registryArgs.listConf = true - registryArgs.internal = true - describe registryArgs.toString() - assert SliderExitCodes.EXIT_NOT_FOUND == client.actionRegistry(registryArgs) - - registryArgs.list = false; - registryArgs.listConf = false - registryArgs.internal = false - - def yarn_site_config = PublishedArtifacts.YARN_SITE_CONFIG - registryArgs.getConf = yarn_site_config - - //properties format - registryArgs.format = "properties" - describe registryArgs.toString() - - client.actionRegistry(registryArgs) - - - File outputDir = new File("target/test_standalone_registry_am/output") - outputDir.mkdirs() - - // create a new registry args with the defaults back in - registryArgs = new ActionRegistryArgs(serviceInstanceData.id) - registryArgs.getConf = yarn_site_config - registryArgs.dest = outputDir - describe registryArgs.toString() - client.actionRegistry(registryArgs) - assert new File(outputDir, yarn_site_config + ".xml").exists() - - registryArgs.format = "properties" - client.actionRegistry(registryArgs) - assert new File(outputDir, yarn_site_config + ".properties").exists() - - describe registryArgs.toString() - - def unknownFilename = "undefined-file" - registryArgs.getConf = unknownFilename - assert SliderExitCodes.EXIT_NOT_FOUND == client.actionRegistry(registryArgs) - - describe "freeze cluster" - //now kill that cluster - assert 0 == clusterActionFreeze(client, clustername) - //list it & See if it is still there - ApplicationReport oldInstance = yarnRegistryClient.findInstance( - clustername) - assert oldInstance != null - assert oldInstance.yarnApplicationState >= YarnApplicationState.FINISHED - - - sleep(20000) - - // now verify that the service is not in the registry - instances = client.listRegistryInstances() - assert instances.size() == 0 - - } -} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestYarnRegistryAM.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestYarnRegistryAM.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestYarnRegistryAM.groovy new file mode 100644 index 0000000..7bdf927 --- /dev/null +++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestYarnRegistryAM.groovy @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.agent.standalone + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import org.apache.hadoop.fs.PathNotFoundException +import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.api.records.YarnApplicationState +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.registry.client.api.RegistryConstants +import org.apache.hadoop.yarn.registry.client.binding.RecordOperations +import org.apache.hadoop.yarn.registry.client.binding.RegistryTypeUtils +import org.apache.hadoop.yarn.registry.client.types.RegistryPathStatus + +import static org.apache.hadoop.yarn.registry.client.binding.BindingUtils.* +import org.apache.slider.agent.AgentMiniClusterTestBase +import org.apache.slider.api.ClusterNode +import org.apache.slider.client.SliderClient +import org.apache.slider.common.SliderExitCodes +import org.apache.slider.common.SliderKeys +import org.apache.slider.common.params.ActionRegistryArgs +import org.apache.slider.core.main.ServiceLauncher +import org.apache.slider.core.persist.JsonSerDeser +import org.apache.slider.core.registry.docstore.PublishedConfigSet +import org.apache.slider.core.registry.docstore.PublishedConfiguration +import org.apache.slider.core.registry.docstore.UriMap +import org.apache.slider.core.registry.info.CustomRegistryConstants +import org.apache.slider.core.registry.retrieve.RegistryRetriever +import org.apache.slider.server.appmaster.PublishedArtifacts +import org.apache.slider.server.appmaster.web.rest.RestPaths +import org.junit.Test + +/** + * work with a YARN registry + */ +@CompileStatic +@Slf4j + +class TestYarnRegistryAM extends AgentMiniClusterTestBase { + + + public static final String ARTIFACT_NAME = PublishedArtifacts.COMPLETE_CONFIG + + @Test + public void testYarnRegistryAM() throws Throwable { + + + describe "create a masterless AM then perform YARN registry operations on it" + + + String clustername = createMiniCluster(configuration, 1, true) + + // get local binding + def registryOperations = microZKCluster.registryOperations + registryOperations.stat(RegistryConstants.PATH_SYSTEM_SERVICES) + + // verify the cluster has the YARN reg service live + def rmRegistryService = miniCluster.getResourceManager(0).getRMContext().registry + assert rmRegistryService + + + + + + ServiceLauncher<SliderClient> launcher + launcher = createStandaloneAM(clustername, true, false) + SliderClient client = launcher.service + addToTeardown(client); + + ApplicationReport report = waitForClusterLive(client) + logReport(report) + List<ApplicationReport> apps = client.applications; + + List<ClusterNode> clusterNodes = client.listClusterNodesInRole( + SliderKeys.COMPONENT_AM) + assert ((List<ClusterNode>)clusterNodes).size() == 1 + + ClusterNode masterNode = clusterNodes[0] + log.info("Master node = ${masterNode}"); + + List<ClusterNode> nodes + String[] uuids = client.listNodeUUIDsByRole(SliderKeys.COMPONENT_AM) + assert uuids.length == 1; + nodes = client.listClusterNodes(uuids); + assert ((List<ClusterNode>)nodes).size() == 1; + describe "AM Node UUID=${uuids[0]}" + + nodes = listNodesInRole(client, SliderKeys.COMPONENT_AM) + assert ((List<ClusterNode>)nodes).size() == 1; + nodes = listNodesInRole(client, "") + assert ((List<ClusterNode>)nodes).size() == 1; + ClusterNode master = nodes[0] + assert master.role == SliderKeys.COMPONENT_AM + + + + + String username = client.username + def yarnRegistryClient = client.yarnAppListClient + describe("list of all applications") + logApplications(apps) + describe("apps of user $username") + List<ApplicationReport> userInstances = yarnRegistryClient.listInstances() + logApplications(userInstances) + assert userInstances.size() == 1 + describe("named app $clustername") + ApplicationReport instance = yarnRegistryClient.findInstance(clustername) + logReport(instance) + assert instance != null + + //switch to the ZK-based registry + + + try { + def yarnRegistryDump = client.dumpYarnRegistry(true).toString() + log.info("yarn service registry: \n${yarnRegistryDump}\n") + } catch (IOException ignored) { + + } + + + describe "service registry names" + def registryService = client.registryOperations + + def self = currentUser() + RegistryPathStatus[] serviceTypes = registryService.listDir(userPath(self)) + dumpArray(serviceTypes) + + def recordsPath = serviceclassPath(self, SliderKeys.APP_TYPE) + + def serviceRecords = RecordOperations.extractServiceRecords(registryService, + registryService.listDir(recordsPath)) + dumpCollection(serviceRecords) + assert serviceRecords.size() == 1 + + def serviceInstance = serviceRecords[0] + log.info(serviceInstance.toString()) + + assert 2 <= serviceInstance.external.size() + + // hit the registry web page + + def registryEndpoint = serviceInstance.getExternalEndpoint( + CustomRegistryConstants.REGISTRY_REST_API) + assert registryEndpoint != null + def registryURL = RegistryTypeUtils.retrieveAddressURL(registryEndpoint) + describe("Registry WADL @ $registryURL") + + def publisherEndpoint = serviceInstance.getExternalEndpoint( + CustomRegistryConstants.PUBLISHER_REST_API) + + def publisherURL = RegistryTypeUtils.retrieveAddressURL(publisherEndpoint) + def publisher = publisherURL.toString() + describe("Publisher") + + JsonSerDeser<UriMap> uriMapDeser = new JsonSerDeser<>(UriMap) + def setlisting = GET(publisherURL) + + log.info(setlisting) + + UriMap uris = uriMapDeser.fromJson(setlisting) + assert uris.uris[RestPaths.SLIDER_CONFIGSET] + def publishedJSON = GET(publisherURL, RestPaths.SLIDER_CONFIGSET) + JsonSerDeser< PublishedConfigSet> serDeser= new JsonSerDeser<>( + PublishedConfigSet) + def configSet = serDeser.fromJson(publishedJSON) + assert configSet.size() >= 1 + assert configSet.contains(ARTIFACT_NAME) + PublishedConfiguration publishedYarnSite = configSet.get(ARTIFACT_NAME) + + assert publishedYarnSite.empty + + //get the full URL + def yarnSitePublisher = appendToURL(publisher, + RestPaths.SLIDER_CONFIGSET, + ARTIFACT_NAME) + + String confJSON = GET(yarnSitePublisher) +// log.info(confJSON) + JsonSerDeser< PublishedConfiguration> confSerDeser = + new JsonSerDeser<PublishedConfiguration>(PublishedConfiguration) + + publishedYarnSite = confSerDeser.fromJson(confJSON) + + assert !publishedYarnSite.empty + + + //get the XML + def yarnSiteXML = yarnSitePublisher + ".xml" + + + String confXML = GET(yarnSiteXML) + log.info("Conf XML at $yarnSiteXML = \n $confXML") + + String properties = GET(yarnSitePublisher + ".properties") + Properties parsedProps = new Properties() + parsedProps.load(new StringReader(properties)) + assert parsedProps.size() > 0 + def rmAddrFromDownloadedProperties = parsedProps.get(YarnConfiguration.RM_ADDRESS) + def rmHostnameFromDownloadedProperties = parsedProps.get(YarnConfiguration.RM_HOSTNAME) + assert rmAddrFromDownloadedProperties + assert rmHostnameFromDownloadedProperties + + String json = GET(yarnSitePublisher + ".json") + + + + describe("Registry List") + log.info(GET(registryURL)) + + + describe "Registry Retrieval Class" + // retrieval + + RegistryRetriever retriever = new RegistryRetriever(serviceInstance) + log.info retriever.toString() + + assert retriever.hasConfigurations(true) + PublishedConfigSet externalConfSet = retriever.getConfigurations(true) + dumpConfigurationSet(externalConfSet) + assert externalConfSet[ARTIFACT_NAME] + + + describe "verify SLIDER-52 processing" + def yarnSite = retriever.retrieveConfiguration( + externalConfSet, + ARTIFACT_NAME, + true) + assert !yarnSite.empty + def siteXML = yarnSite.asConfiguration() + def rmHostnameViaClientSideXML = parsedProps.get( + YarnConfiguration.RM_HOSTNAME) + assert rmHostnameViaClientSideXML == rmHostnameFromDownloadedProperties + def rmAddrViaClientSideXML = siteXML.get(YarnConfiguration.RM_ADDRESS) + + log.info("RM from downloaded props = $rmAddrFromDownloadedProperties") + assert rmAddrViaClientSideXML == rmAddrFromDownloadedProperties + + describe "fetch missing artifact" + try { + retriever.retrieveConfiguration(externalConfSet, "no-such-artifact", true) + fail("expected a failure") + } catch (FileNotFoundException expected) { + // expected + } + describe "Internal configurations" + assert !retriever.hasConfigurations(false) + try { + retriever.getConfigurations(false) + fail("expected a failure") + } catch (FileNotFoundException expected) { + // expected + } + + + // retrieval via API + ActionRegistryArgs registryArgs = new ActionRegistryArgs() + registryArgs.verbose = true + + // list all + registryArgs.list = true; + describe registryArgs.toString() + client.actionRegistry(registryArgs) + + // list a named instance and expect a failure + registryArgs.list = true; + registryArgs.name = "unknown" + try { + client.actionRegistryListYarn(registryArgs) + } catch (PathNotFoundException expected) { + // expected + } + + // list all instances of an alternate type and expect failure + registryArgs.list = true; + registryArgs.name = null + registryArgs.serviceType = "org-apache-hadoop" + try { + client.actionRegistryListYarn(registryArgs) + } catch (PathNotFoundException expected) { + // expected + } + + registryArgs.serviceType = "" + + //set the name + registryArgs.name = clustername; + registryArgs.serviceType = SliderKeys.APP_TYPE + + + //now expect list to work + describe registryArgs.toString() + + def listedInstance = client.actionRegistryListYarn(registryArgs) + assert listedInstance[0].id == serviceInstance.id + + + // listconf + registryArgs.list = false; + registryArgs.listConf = true + describe registryArgs.toString() + + client.actionRegistry(registryArgs) + + // listconf --internal + registryArgs.list = false; + registryArgs.listConf = true + registryArgs.internal = true + describe registryArgs.toString() + assert SliderExitCodes.EXIT_NOT_FOUND == client.actionRegistry(registryArgs) + + registryArgs.list = false; + registryArgs.listConf = false + registryArgs.internal = false + + def yarn_site_config = PublishedArtifacts.YARN_SITE_CONFIG + registryArgs.getConf = yarn_site_config + + //properties format + registryArgs.format = "properties" + describe registryArgs.toString() + + client.actionRegistry(registryArgs) + + + File outputDir = new File("target/test_standalone_registry_am/output") + outputDir.mkdirs() + + // create a new registry args with the defaults back in + registryArgs = new ActionRegistryArgs(clustername) + registryArgs.getConf = yarn_site_config + registryArgs.dest = outputDir + describe registryArgs.toString() + client.actionRegistry(registryArgs) + assert new File(outputDir, yarn_site_config + ".xml").exists() + + registryArgs.format = "properties" + client.actionRegistry(registryArgs) + assert new File(outputDir, yarn_site_config + ".properties").exists() + + describe registryArgs.toString() + + def unknownFilename = "undefined-file" + registryArgs.getConf = unknownFilename + assert SliderExitCodes.EXIT_NOT_FOUND == client.actionRegistry(registryArgs) + + describe "freeze cluster" + //now kill that cluster + assert 0 == clusterActionFreeze(client, clustername) + //list it & See if it is still there + ApplicationReport oldInstance = yarnRegistryClient.findInstance( + clustername) + assert oldInstance != null + assert oldInstance.yarnApplicationState >= YarnApplicationState.FINISHED + + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy index cb1d9b5..7f72490 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy @@ -21,18 +21,20 @@ package org.apache.slider.test import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.registry.client.api.RegistryOperations +import org.apache.hadoop.yarn.registry.client.services.RegistryOperationsService +import org.apache.hadoop.yarn.registry.server.services.MicroZookeeperService import org.apache.slider.common.tools.SliderUtils -import org.apache.slider.core.zk.MiniZooKeeperCluster @Slf4j @CompileStatic class MicroZKCluster implements Closeable { public static final String HOSTS = "127.0.0.1" - MiniZooKeeperCluster zkCluster + MicroZookeeperService zkService String zkBindingString - Configuration conf - int port + final Configuration conf + public RegistryOperations registryOperations MicroZKCluster() { this(SliderUtils.createConfiguration()) @@ -43,20 +45,29 @@ class MicroZKCluster implements Closeable { } void createCluster() { - zkCluster = new MiniZooKeeperCluster(1) - zkCluster.init(conf) - zkCluster.start() - zkBindingString = zkCluster.zkQuorum + zkService = new MicroZookeeperService("zookeeper") + + zkService.init(conf) + zkService.start() + zkBindingString = zkService.connectionString log.info("Created $this") + registryOperations = new RegistryOperationsService( + "registry", + zkService) + registryOperations.init(conf) + registryOperations.start() } @Override void close() throws IOException { - zkCluster?.stop() + registryOperations?.stop() + zkService?.stop() } @Override String toString() { return "Micro ZK cluster as $zkBindingString" } + + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy index 0036db7..2f8e50a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy @@ -51,6 +51,8 @@ import org.apache.slider.server.services.curator.CuratorServiceInstance import org.junit.Assert import org.junit.Assume +import java.lang.reflect.Array + import static Arguments.ARG_OPTION /** @@ -708,11 +710,15 @@ class SliderTestUtils extends Assert { dumpCollection(entries) } - def static void dumpCollection(Collection<String> entries) { + def static void dumpCollection(Collection entries) { log.info("number of entries: ${entries.size()}") - entries.each { String it -> log.info(it) } + entries.each { log.info(it.toString()) } } + def static void dumpArray(Object[] entries) { + log.info("number of entries: ${entries.length}") + entries.each { log.info(it.toString()) } + } /** * Get a time option in seconds if set, otherwise the default value (also in seconds). * This operation picks up the time value as a system property if set -that http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy index eab1b09..81034d2 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy @@ -49,16 +49,12 @@ import org.apache.slider.core.main.ServiceLauncher import org.apache.slider.core.main.ServiceLauncherBaseTest import org.apache.slider.server.appmaster.SliderAppMaster import org.junit.After -import org.junit.Assert -import org.junit.Before import org.junit.BeforeClass import org.junit.Rule -import org.junit.rules.TestName import org.junit.rules.Timeout import static org.apache.slider.test.KeysForTests.* -import static org.apache.slider.common.SliderKeys.*; import static org.apache.slider.common.SliderXMLConfKeysForTesting.*; /** * Base class for mini cluster tests -creates a field for the @@ -245,7 +241,8 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { conf.set(YarnConfiguration.RM_SCHEDULER, FIFO_SCHEDULER); SliderUtils.patchConfiguration(conf) name = buildClustername(name) - miniCluster = new MiniYARNCluster(name, noOfNodeManagers, numLocalDirs, numLogDirs) + miniCluster = + new MiniYARNCluster(name, noOfNodeManagers, numLocalDirs, numLogDirs, 1, false, false) miniCluster.init(conf) miniCluster.start(); if (startHDFS) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a0528889/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy index 0259fb7..bfb32c7 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy @@ -23,6 +23,7 @@ import groovy.util.logging.Slf4j import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.IOUtils import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.registry.client.api.RegistryConstants import org.apache.slider.common.SliderXmlConfKeys import org.apache.slider.core.zk.BlockingZKWatcher import org.apache.slider.core.zk.ZKIntegration @@ -86,13 +87,13 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase } /** - * Create and start a minicluster + * Create and start a minicluster with ZK * @param name cluster/test name * @param conf configuration to use * @param noOfNodeManagers #of NMs * @param numLocalDirs #of local dirs * @param numLogDirs #of log dirs - * @param startZK create a ZK micro cluster + * @param startZK create a ZK micro cluster *THIS IS IGNORED* * @param startHDFS create an HDFS mini cluster */ protected String createMiniCluster(String name, @@ -102,12 +103,12 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase int numLogDirs, boolean startZK, boolean startHDFS) { - name = createMiniCluster(name, conf, noOfNodeManagers, numLocalDirs, numLogDirs, + createMicroZKCluster(conf) + conf.setBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, true) + conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM, ZKBinding) + name = super.createMiniCluster(name, conf, noOfNodeManagers, numLocalDirs, numLogDirs, startHDFS) - if (startZK) { - createMicroZKCluster(conf) - } return name } @@ -155,14 +156,6 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase } } - protected int getZKPort() { - return microZKCluster ? microZKCluster.port : 2181; - } - - protected String getZKHosts() { - return MicroZKCluster.HOSTS; - } - /** * CLI args include all the ZK bindings needed * @return