SLIDER-82 setting up node listings into AppState binding
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1312bc33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1312bc33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1312bc33 Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: 1312bc33c6b4739c6be96ecc95fa3f3391baf73f Parents: 9cda83b Author: Steve Loughran <ste...@apache.org> Authored: Wed Nov 4 18:11:28 2015 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Thu Nov 5 13:19:03 2015 +0000 ---------------------------------------------------------------------- .../slider/client/SliderYarnClientImpl.java | 18 +++++++++-- .../server/appmaster/SliderAppMaster.java | 34 ++++++++++++-------- .../appmaster/state/AppStateBindingInfo.java | 3 ++ .../slider/agent/rest/TestStandaloneREST.groovy | 4 +-- .../appmaster/model/mock/MockYarnCluster.groovy | 11 +++---- 5 files changed, 44 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1312bc33/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java index 42759fd..803ccd6 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java @@ -20,6 +20,7 @@ package org.apache.slider.client; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -42,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.BindException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -64,6 +67,17 @@ public class SliderYarnClientImpl extends YarnClientImpl { */ public static final String KILL_ALL = "all"; + @Override + protected void serviceInit(Configuration conf) throws Exception { + String addr = conf.get(YarnConfiguration.RM_ADDRESS); + if (addr.startsWith("0.0.0.0")) { + // address isn't known; fail fast + throw new BindException("Invalid " + YarnConfiguration.RM_ADDRESS + " value:" + addr + + " - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort"); + } + super.serviceInit(conf); + } + /** * Get the RM Client RPC interface * @return an RPC interface valid after initialization and authentication @@ -107,7 +121,6 @@ public class SliderYarnClientImpl extends YarnClientImpl { return results; } - /** * find all instances of a specific app -if there is more than one in the * YARN cluster, @@ -141,8 +154,7 @@ public class SliderYarnClientImpl extends YarnClientImpl { public boolean isApplicationLive(ApplicationReport app) { Preconditions.checkArgument(app != null, "Null app report"); - return app.getYarnApplicationState().ordinal() <= - YarnApplicationState.RUNNING.ordinal(); + return app.getYarnApplicationState().ordinal() <= YarnApplicationState.RUNNING.ordinal(); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1312bc33/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index b552290..e6a5bd5 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; @@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; @@ -173,6 +172,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.BindException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; @@ -215,8 +215,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ protected static final Logger LOG_YARN = log; - public static final String SERVICE_CLASSNAME_SHORT = - "SliderAppMaster"; + public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster"; public static final String SERVICE_CLASSNAME = "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT; @@ -495,8 +494,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService metrics.registerAll(new GarbageCollectorMetricSet()); */ - contentCache = ApplicationResouceContentCacheFactory.createContentCache( - stateForProviders); + contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders); executorService = new WorkflowExecutorService<>("AmExecutor", Executors.newFixedThreadPool(2, @@ -504,6 +502,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService addService(executorService); addService(actionQueues); + // set up the YARN client. This may require patching in the RM client-API address if it + // is (somehow) unset server-side. + String clientRMaddr = conf.get(YarnConfiguration.RM_ADDRESS); + if (clientRMaddr.startsWith("0.0.0.0")) { + // address isn't known; fail fast + throw new BindException("Invalid " + YarnConfiguration.RM_ADDRESS + " value:" + addr + + " - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort"); + } addService(yarnClient = new SliderYarnClientImpl()); //init all child services @@ -564,8 +570,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //dump the system properties if in debug mode if (log.isDebugEnabled()) { - log.debug("System properties:\n" + - SliderUtils.propertiesToString(System.getProperties())); + log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties())); } //choose the action @@ -634,8 +639,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // obtain security state securityEnabled = securityConfiguration.isSecurityEnabled(); // set the global security flag for the instance definition - instanceDefinition.getAppConfOperations().set( - KEY_SECURITY_ENABLED, securityEnabled); + instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled); // triggers resolution and snapshotting for agent appState.setInitialInstanceDefinition(instanceDefinition); @@ -653,8 +657,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService InternalKeys.INTERNAL_PROVIDER_NAME); log.info("Cluster provider type is {}", providerType); SliderProviderFactory factory = - SliderProviderFactory.createSliderProviderFactory( - providerType); + SliderProviderFactory.createSliderProviderFactory(providerType); providerService = factory.createServerProvider(); // init the provider BUT DO NOT START IT YET initAndAddService(providerService); @@ -673,8 +676,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * turned into an (incompete) container */ appMasterContainerID = ConverterUtils.toContainerId( - SliderUtils.mandatoryEnvVariable( - ApplicationConstants.Environment.CONTAINER_ID.name())); + SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name())); appAttemptID = appMasterContainerID.getApplicationAttemptId(); ApplicationId appid = appAttemptID.getApplicationId(); @@ -687,6 +689,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService Map<String, String> envVars; List<Container> liveContainers; + List<NodeReport> nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + log.info("Yarn node report count: {}", nodeReports.size()); + /* * It is critical this section is synchronized, to stop async AM events * arriving while registering a restarting AM. @@ -844,6 +849,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService binding.liveContainers = liveContainers; binding.applicationInfo = appInformation; binding.releaseSelector = providerService.createContainerReleaseSelector(); + binding.nodeReports = nodeReports; appState.buildInstance(binding); providerService.rebuildContainerDetails(liveContainers, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1312bc33/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java index 184c8aa..a2a0b60 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.providers.ProviderRole; @@ -46,6 +47,8 @@ public class AppStateBindingInfo { public List<Container> liveContainers = new ArrayList<>(0); public Map<String, String> applicationInfo = new HashMap<>(); public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector(); + /** node reports off the RM. If null, the app state needs to be given a node update later */ + public List<NodeReport> nodeReports = new ArrayList<>(0); public void validate() throws IllegalArgumentException { Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition"); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1312bc33/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy index 97b3009..29fa51a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/rest/TestStandaloneREST.groovy @@ -41,7 +41,6 @@ import static org.apache.slider.server.appmaster.web.rest.RestPaths.* @Slf4j class TestStandaloneREST extends AgentMiniClusterTestBase { - @Test public void testStandaloneREST() throws Throwable { @@ -65,8 +64,7 @@ class TestStandaloneREST extends AgentMiniClusterTestBase { ApplicationReport report = waitForClusterLive(client) def proxyAM = report.trackingUrl def directAM = report.originalTrackingUrl - - + // set up url config to match initHttpTestSupport(launcher.configuration) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1312bc33/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy index 6056e3a..99a9213 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnCluster.groovy @@ -147,14 +147,15 @@ public class MockYarnCluster { /** * Model cluster nodes on the simpler "slot" model than the YARN-era - * resource allocation model. Why? Makes it easier to implement. + * resource allocation model. Why? Easier to implement scheduling. + * Of course, if someone does want to implement the full process... * - * When a cluster is offline, */ public static class MockYarnClusterNode { public final int nodeIndex public final String hostname; + public List<String> labels = [] public final MockNodeId nodeId; public final MockYarnClusterContainer[] containers; private boolean offline; @@ -230,8 +231,6 @@ public class MockYarnCluster { } return result } - - /** * Release a container @@ -291,8 +290,8 @@ public class MockYarnCluster { return (hostIndex << 8) | containerIndex & 0xff; } - public static final int extractHost(int cid) { - return (cid >>> 8); + public static final int extractHost(long cid) { + return (cid >>> 8) & 0xffff; } public static final int extractContainer(int cid) {