SLIDER-320 agent provider registers endpoints and triggers a publish operation, as does app state on an NM container start event. Nothing happens at the moment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/31bfae26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/31bfae26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/31bfae26 Branch: refs/heads/feature/SLIDER-151_REST_API Commit: 31bfae26a5ee44794cd72fd681dbf69d237e18f8 Parents: 7e2ce27 Author: Steve Loughran <ste...@apache.org> Authored: Fri Aug 15 15:18:42 2014 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Fri Aug 15 15:18:42 2014 +0100 ---------------------------------------------------------------------- .../providers/agent/AgentProviderService.java | 32 +++++++++++++-- .../server/appmaster/SliderAppMaster.java | 24 +++++++++-- .../actions/PublishRegistryDetails.java | 3 +- .../slider/server/appmaster/state/AppState.java | 43 +++++++++++++++++--- .../appmaster/state/ProviderAppState.java | 5 +++ .../server/appmaster/state/RoleInstance.java | 42 +++++++++++++++++-- .../state/StateAccessForProviders.java | 7 ++++ .../providers/hbase/HBaseProviderService.java | 11 ++--- 8 files changed, 144 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 91bc0a6..680074d 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -42,6 +42,7 @@ import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadCommandArgumentsException; import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.NoSuchNodeException; import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.launch.CommandLineBuilder; import org.apache.slider.core.launch.ContainerLauncher; @@ -62,7 +63,9 @@ import org.apache.slider.providers.agent.application.metadata.Metainfo; import org.apache.slider.providers.agent.application.metadata.OSPackage; import org.apache.slider.providers.agent.application.metadata.OSSpecific; import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss; +import org.apache.slider.server.appmaster.actions.PublishRegistryDetails; import org.apache.slider.server.appmaster.state.ContainerPriority; +import org.apache.slider.server.appmaster.state.RoleInstance; import org.apache.slider.server.appmaster.state.StateAccessForProviders; import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType; import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; @@ -96,6 +99,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS; @@ -541,14 +545,36 @@ public class AgentProviderService extends AbstractProviderService implements String roleName, String containerId, Map<String, String> ports) { + RoleInstance instance; + try { + instance = getAmState().getOwnedContainer(containerId); + } catch (NoSuchNodeException e) { + log.warn("Failed to locate instance of container {}: {}", containerId, e); + instance = null; + } for (Map.Entry<String, String> port : ports.entrySet()) { - log.info("Recording allocated port for {} as {}", port.getKey(), port.getValue()); - this.getAllocatedPorts().put(port.getKey(), port.getValue()); - this.getAllocatedPorts(containerId).put(port.getKey(), port.getValue()); + String portname = port.getKey(); + String portNo = port.getValue(); + log.info("Recording allocated port for {} as {}", portname, portNo); + this.getAllocatedPorts().put(portname, portNo); + this.getAllocatedPorts(containerId).put(portname, portNo); + if (instance!=null) { + try { + instance.registerPortEndpoint(Integer.valueOf(portNo), portname, ""); + } catch (NumberFormatException e) { + log.warn("Failed to parse {}: {}", portNo, e); + } + } } // component specific publishes processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName); + + // and update registration entries + if (instance != null) { + queueAccess.put(new PublishRegistryDetails(instance.getId(), 0, + TimeUnit.MILLISECONDS)); + } } private void updateComponentStatusWithAgentState( http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 9a16188..c54a268 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -84,7 +84,6 @@ import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.exceptions.SliderInternalStateException; import org.apache.slider.core.exceptions.TriggerClusterTeardownException; import org.apache.slider.core.main.ExitCodeProvider; -import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.core.main.RunService; import org.apache.slider.core.main.ServiceLauncher; import org.apache.slider.core.persist.ConfTreeSerDeser; @@ -99,6 +98,7 @@ import org.apache.slider.providers.SliderProviderFactory; import org.apache.slider.providers.slideram.SliderAMClientProvider; import org.apache.slider.providers.slideram.SliderAMProviderService; import org.apache.slider.server.appmaster.actions.ActionKillContainer; +import org.apache.slider.server.appmaster.actions.PublishRegistryDetails; import org.apache.slider.server.appmaster.actions.QueueExecutor; import org.apache.slider.server.appmaster.actions.ActionHalt; import org.apache.slider.server.appmaster.actions.QueueService; @@ -901,6 +901,20 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** + * Register/re-register a component (that is already in the app state + * @param id the component + */ + public boolean registerComponent(ContainerId id) { + RoleInstance instance = appState.getOwnedContainer(id); + if (instance == null) { + return false; + } + // this is where component registrations will go + + return true; + } + + /** * looks for a specific case where a token file is provided as an environment * variable, yet the file is not there. * @@ -1192,9 +1206,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService throws IOException, SliderInternalStateException, BadConfigException { appState.updateResourceDefinitions(resources); - appState.resetFailureCounts(); - // reset the scheduled window resetter...the values + + // reset the scheduled windows...the values // may have changed + appState.resetFailureCounts(); @@ -1665,6 +1680,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //trigger an async container status nmClientAsync.getContainerStatusAsync(containerId, cinfo.container.getNodeId()); + // push out a registration + queue(new PublishRegistryDetails(containerId, 0, TimeUnit.MILLISECONDS)); + } else { //this is a hypothetical path not seen. We react by warning log.error("Notified of started container that isn't pending {} - releasing", http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java index f96183f..cb6d140 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java @@ -31,7 +31,7 @@ public class PublishRegistryDetails extends AsyncAction { public PublishRegistryDetails(ContainerId containerId, long delay, TimeUnit timeUnit) { - super("PublishEndpointDetails :" + containerId.toString(), + super("PublishRegistryDetails :" + containerId.toString(), delay, timeUnit); this.containerId = containerId; } @@ -40,6 +40,5 @@ public class PublishRegistryDetails extends AsyncAction { public void execute(SliderAppMaster appMaster, QueueAccess queueService, AppState appState) throws Exception { - } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 2f2eadd..07976ef 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -850,20 +850,53 @@ public class AppState { /** * Lookup live instance by string value of container ID - * @param containerId container ID + * @param containerId container ID as a string * @return the role instance for that container * @throws NoSuchNodeException if it does not exist */ public synchronized RoleInstance getLiveInstanceByContainerID(String containerId) - throws NoSuchNodeException { + throws NoSuchNodeException { Collection<RoleInstance> nodes = getLiveNodes().values(); + return findNodeInCollection(containerId, nodes); + } + + /** + * Lookup owned instance by string value of container ID + * @param containerId container ID as a string + * @return the role instance for that container + * @throws NoSuchNodeException if it does not exist + */ + public synchronized RoleInstance getOwnedInstanceByContainerID(String containerId) + throws NoSuchNodeException { + Collection<RoleInstance> nodes = ownedContainers.values(); + return findNodeInCollection(containerId, nodes); + } + + + + /** + * Iterate through a collection of role instances to find one with a + * specific (string) container ID + * @param containerId container ID as a string + * @param nodes collection + * @return + * @throws NoSuchNodeException if there was no match + */ + private RoleInstance findNodeInCollection(String containerId, + Collection<RoleInstance> nodes) throws NoSuchNodeException { + RoleInstance found = null; for (RoleInstance node : nodes) { if (containerId.equals(node.id)) { - return node; + found = node; + break; } } - //at this point: no node - throw new NoSuchNodeException(containerId); + if (found != null) { + return found; + } else { + //at this point: no node + throw new NoSuchNodeException(containerId); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java index e7384c8..a0871ae 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java @@ -179,6 +179,11 @@ public class ProviderAppState implements StateAccessForProviders { } @Override + public RoleInstance getOwnedContainer(String id) throws NoSuchNodeException { + return appState.getOwnedInstanceByContainerID(id); + } + + @Override public List<RoleInstance> cloneLiveContainerInfoList() { return appState.cloneLiveContainerInfoList(); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java index 8b6ed8e..e373843 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java @@ -25,8 +25,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.proto.Messages; import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.registry.info.RegisteredEndpoint; +import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** * Tracking information about a container @@ -82,11 +86,12 @@ public final class RoleInstance implements Cloneable { public String host; public String hostURL; + /** - * Any information the provider wishes to retain on the state of - * an instance + * A list of registered endpoints. */ - public Object providerInfo; + private List<RegisteredEndpoint> endpoints = + new ArrayList<RegisteredEndpoint>(2); public RoleInstance(Container container) { Preconditions.checkNotNull(container, "Null container"); @@ -188,8 +193,39 @@ public final class RoleInstance implements Cloneable { @Override public Object clone() throws CloneNotSupportedException { RoleInstance cloned = (RoleInstance) super.clone(); + // clone the endpoint list, but not the values + cloned.endpoints = new ArrayList<RegisteredEndpoint>(this.endpoints); return cloned; } + /** + * Get the list of endpoints. + * @return the endpoint list. + */ + public List<RegisteredEndpoint> getEndpoints() { + return endpoints; + } + + /** + * Add an endpoint registration + * @param endpoint + */ + public void addEndpoint(RegisteredEndpoint endpoint) { + Preconditions.checkArgument(endpoint != null); + endpoints.add(endpoint); + } + /** + * Register a port endpoint as an inet-addr formatted endpoint, using the + * hostname as the first part of the address + * @param port + * @param protocol + * @param text + */ + public void registerPortEndpoint(int port, String protocol, String text) { + InetSocketAddress addr = new InetSocketAddress(host, port); + RegisteredEndpoint epr = new RegisteredEndpoint(addr, protocol, text); + addEndpoint(epr); + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java index b236dfc..1714f75 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java @@ -160,6 +160,13 @@ public interface StateAccessForProviders { RoleInstance getOwnedContainer(ContainerId id); /** + * Get any active container with the given ID + * @param id container Id + * @return the active container or null if it is not found + */ + RoleInstance getOwnedContainer(String id) throws NoSuchNodeException; + + /** * Create a clone of the list of live cluster nodes. * @return the list of nodes, may be empty */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java ---------------------------------------------------------------------- diff --git a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java index 1fb392c..82e535f 100644 --- a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java +++ b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java @@ -68,11 +68,8 @@ import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_ * This class implements the server-side aspects * of an HBase Cluster */ -public class HBaseProviderService extends AbstractProviderService implements - ProviderCore, - HBaseKeys, - SliderKeys, - AgentRestOperations{ +public class HBaseProviderService extends AbstractProviderService + implements ProviderCore, HBaseKeys, SliderKeys, AgentRestOperations{ protected static final Logger log = LoggerFactory.getLogger(HBaseProviderService.class); @@ -109,8 +106,8 @@ public class HBaseProviderService extends AbstractProviderService implements * @param instanceDefinition the instance definition to validate */ @Override // Client and Server - public void validateInstanceDefinition(AggregateConf instanceDefinition) throws - SliderException { + public void validateInstanceDefinition(AggregateConf instanceDefinition) + throws SliderException { clientProvider.validateInstanceDefinition(instanceDefinition); }