SLIDER-320 agent provider registers endpoints and triggers a publish operation, 
as does app state on an NM container start event. Nothing happens at the moment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/31bfae26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/31bfae26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/31bfae26

Branch: refs/heads/feature/SLIDER-151_REST_API
Commit: 31bfae26a5ee44794cd72fd681dbf69d237e18f8
Parents: 7e2ce27
Author: Steve Loughran <ste...@apache.org>
Authored: Fri Aug 15 15:18:42 2014 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Fri Aug 15 15:18:42 2014 +0100

----------------------------------------------------------------------
 .../providers/agent/AgentProviderService.java   | 32 +++++++++++++--
 .../server/appmaster/SliderAppMaster.java       | 24 +++++++++--
 .../actions/PublishRegistryDetails.java         |  3 +-
 .../slider/server/appmaster/state/AppState.java | 43 +++++++++++++++++---
 .../appmaster/state/ProviderAppState.java       |  5 +++
 .../server/appmaster/state/RoleInstance.java    | 42 +++++++++++++++++--
 .../state/StateAccessForProviders.java          |  7 ++++
 .../providers/hbase/HBaseProviderService.java   | 11 ++---
 8 files changed, 144 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 91bc0a6..680074d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -42,6 +42,7 @@ import org.apache.slider.core.conf.ConfTreeOperations;
 import org.apache.slider.core.conf.MapOperations;
 import org.apache.slider.core.exceptions.BadCommandArgumentsException;
 import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.NoSuchNodeException;
 import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.launch.CommandLineBuilder;
 import org.apache.slider.core.launch.ContainerLauncher;
@@ -62,7 +63,9 @@ import 
org.apache.slider.providers.agent.application.metadata.Metainfo;
 import org.apache.slider.providers.agent.application.metadata.OSPackage;
 import org.apache.slider.providers.agent.application.metadata.OSSpecific;
 import 
org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss;
+import org.apache.slider.server.appmaster.actions.PublishRegistryDetails;
 import org.apache.slider.server.appmaster.state.ContainerPriority;
+import org.apache.slider.server.appmaster.state.RoleInstance;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
@@ -96,6 +99,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS;
@@ -541,14 +545,36 @@ public class AgentProviderService extends 
AbstractProviderService implements
                                      String roleName,
                                      String containerId,
                                      Map<String, String> ports) {
+    RoleInstance instance;
+    try {
+      instance = getAmState().getOwnedContainer(containerId);
+    } catch (NoSuchNodeException e) {
+      log.warn("Failed to locate instance of container {}: {}", containerId, 
e);
+      instance = null;
+    }
     for (Map.Entry<String, String> port : ports.entrySet()) {
-      log.info("Recording allocated port for {} as {}", port.getKey(), 
port.getValue());
-      this.getAllocatedPorts().put(port.getKey(), port.getValue());
-      this.getAllocatedPorts(containerId).put(port.getKey(), port.getValue());
+      String portname = port.getKey();
+      String portNo = port.getValue();
+      log.info("Recording allocated port for {} as {}", portname, portNo);
+      this.getAllocatedPorts().put(portname, portNo);
+      this.getAllocatedPorts(containerId).put(portname, portNo);
+        if (instance!=null) {
+          try {
+            instance.registerPortEndpoint(Integer.valueOf(portNo), portname, 
"");
+          } catch (NumberFormatException e) {
+            log.warn("Failed to parse {}: {}", portNo, e);
+          }
+        }
     }
 
     // component specific publishes
     processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName);
+    
+    // and update registration entries
+    if (instance != null) {
+      queueAccess.put(new PublishRegistryDetails(instance.getId(), 0,
+          TimeUnit.MILLISECONDS));
+    }
   }
 
   private void updateComponentStatusWithAgentState(

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 9a16188..c54a268 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -84,7 +84,6 @@ import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.exceptions.SliderInternalStateException;
 import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
 import org.apache.slider.core.main.ExitCodeProvider;
-import org.apache.slider.core.main.LauncherExitCodes;
 import org.apache.slider.core.main.RunService;
 import org.apache.slider.core.main.ServiceLauncher;
 import org.apache.slider.core.persist.ConfTreeSerDeser;
@@ -99,6 +98,7 @@ import org.apache.slider.providers.SliderProviderFactory;
 import org.apache.slider.providers.slideram.SliderAMClientProvider;
 import org.apache.slider.providers.slideram.SliderAMProviderService;
 import org.apache.slider.server.appmaster.actions.ActionKillContainer;
+import org.apache.slider.server.appmaster.actions.PublishRegistryDetails;
 import org.apache.slider.server.appmaster.actions.QueueExecutor;
 import org.apache.slider.server.appmaster.actions.ActionHalt;
 import org.apache.slider.server.appmaster.actions.QueueService;
@@ -901,6 +901,20 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   }
 
   /**
+   * Register/re-register a component (that is already in the app state
+   * @param id the component
+   */
+  public boolean registerComponent(ContainerId id) {
+    RoleInstance instance = appState.getOwnedContainer(id);
+    if (instance == null) {
+      return false;
+    }
+    // this is where component registrations will go
+
+    return true;
+  }
+  
+  /**
    * looks for a specific case where a token file is provided as an environment
    * variable, yet the file is not there.
    * 
@@ -1192,9 +1206,10 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
     throws IOException, SliderInternalStateException, BadConfigException {
 
     appState.updateResourceDefinitions(resources);
-    appState.resetFailureCounts();
-    // reset the scheduled window resetter...the values
+
+    // reset the scheduled windows...the values
     // may have changed
+    appState.resetFailureCounts();
     
 
 
@@ -1665,6 +1680,9 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       //trigger an async container status
       nmClientAsync.getContainerStatusAsync(containerId,
                                             cinfo.container.getNodeId());
+      // push out a registration
+      queue(new PublishRegistryDetails(containerId, 0, TimeUnit.MILLISECONDS));
+      
     } else {
       //this is a hypothetical path not seen. We react by warning
       log.error("Notified of started container that isn't pending {} - 
releasing",

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java
index f96183f..cb6d140 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/PublishRegistryDetails.java
@@ -31,7 +31,7 @@ public class PublishRegistryDetails extends AsyncAction {
 
   public PublishRegistryDetails(ContainerId containerId, long delay,
       TimeUnit timeUnit) {
-    super("PublishEndpointDetails :" + containerId.toString(),
+    super("PublishRegistryDetails :" + containerId.toString(),
         delay, timeUnit);
     this.containerId = containerId;
   }
@@ -40,6 +40,5 @@ public class PublishRegistryDetails extends AsyncAction {
   public void execute(SliderAppMaster appMaster,
       QueueAccess queueService,
       AppState appState) throws Exception {
-    
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 2f2eadd..07976ef 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -850,20 +850,53 @@ public class AppState {
 
   /**
    * Lookup live instance by string value of container ID
-   * @param containerId container ID
+   * @param containerId container ID as a string
    * @return the role instance for that container
    * @throws NoSuchNodeException if it does not exist
    */
   public synchronized RoleInstance getLiveInstanceByContainerID(String 
containerId)
-    throws NoSuchNodeException {
+      throws NoSuchNodeException {
     Collection<RoleInstance> nodes = getLiveNodes().values();
+    return findNodeInCollection(containerId, nodes);
+  }
+
+  /**
+   * Lookup owned instance by string value of container ID
+   * @param containerId container ID as a string
+   * @return the role instance for that container
+   * @throws NoSuchNodeException if it does not exist
+   */
+  public synchronized RoleInstance getOwnedInstanceByContainerID(String 
containerId)
+      throws NoSuchNodeException {
+    Collection<RoleInstance> nodes = ownedContainers.values();
+    return findNodeInCollection(containerId, nodes);
+  }
+
+  
+  
+  /**
+   * Iterate through a collection of role instances to find one with a
+   * specific (string) container ID
+   * @param containerId container ID as a string
+   * @param nodes collection
+   * @return 
+   * @throws NoSuchNodeException if there was no match
+   */
+  private RoleInstance findNodeInCollection(String containerId,
+      Collection<RoleInstance> nodes) throws NoSuchNodeException {
+    RoleInstance found = null;
     for (RoleInstance node : nodes) {
       if (containerId.equals(node.id)) {
-        return node;
+        found = node;
+        break;
       }
     }
-    //at this point: no node
-    throw new NoSuchNodeException(containerId);
+    if (found != null) {
+      return found;
+    } else {
+      //at this point: no node
+      throw new NoSuchNodeException(containerId);
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
index e7384c8..a0871ae 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -179,6 +179,11 @@ public class ProviderAppState implements 
StateAccessForProviders {
   }
 
   @Override
+  public RoleInstance getOwnedContainer(String id) throws NoSuchNodeException {
+    return appState.getOwnedInstanceByContainerID(id);
+  }
+
+  @Override
   public List<RoleInstance> cloneLiveContainerInfoList() {
     return appState.cloneLiveContainerInfoList();
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
index 8b6ed8e..e373843 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java
@@ -25,8 +25,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.slider.api.ClusterDescription;
 import org.apache.slider.api.proto.Messages;
 import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.registry.info.RegisteredEndpoint;
 
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * Tracking information about a container
@@ -82,11 +86,12 @@ public final class RoleInstance implements Cloneable {
   public String host;
   public String hostURL;
 
+
   /**
-   * Any information the provider wishes to retain on the state of
-   * an instance
+   * A list of registered endpoints.
    */
-  public Object providerInfo;
+  private List<RegisteredEndpoint> endpoints =
+      new ArrayList<RegisteredEndpoint>(2);
 
   public RoleInstance(Container container) {
     Preconditions.checkNotNull(container, "Null container");
@@ -188,8 +193,39 @@ public final class RoleInstance implements Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     RoleInstance cloned = (RoleInstance) super.clone();
+    // clone the endpoint list, but not the values
+    cloned.endpoints = new ArrayList<RegisteredEndpoint>(this.endpoints);
     return cloned;
   }
 
+  /**
+   * Get the list of endpoints. 
+   * @return the endpoint list.
+   */
+  public List<RegisteredEndpoint> getEndpoints() {
+    return endpoints;
+  }
+
+  /**
+   * Add an endpoint registration
+   * @param endpoint
+   */
+  public void addEndpoint(RegisteredEndpoint endpoint) {
+    Preconditions.checkArgument(endpoint != null);
+    endpoints.add(endpoint);
+  }
 
+  /**
+   * Register a port endpoint as an inet-addr formatted endpoint, using the
+   * hostname as the first part of the address
+   * @param port
+   * @param protocol
+   * @param text
+   */
+  public void registerPortEndpoint(int port, String protocol, String text) {
+    InetSocketAddress addr = new InetSocketAddress(host, port);
+    RegisteredEndpoint epr = new RegisteredEndpoint(addr, protocol, text);
+    addEndpoint(epr);
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
index b236dfc..1714f75 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/StateAccessForProviders.java
@@ -160,6 +160,13 @@ public interface StateAccessForProviders {
   RoleInstance getOwnedContainer(ContainerId id);
 
   /**
+   * Get any active container with the given ID
+   * @param id container Id
+   * @return the active container or null if it is not found
+   */
+  RoleInstance getOwnedContainer(String id) throws NoSuchNodeException;
+
+  /**
    * Create a clone of the list of live cluster nodes.
    * @return the list of nodes, may be empty
    */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/31bfae26/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
 
b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
index 1fb392c..82e535f 100644
--- 
a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
+++ 
b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
@@ -68,11 +68,8 @@ import static 
org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_
  * This class implements the server-side aspects
  * of an HBase Cluster
  */
-public class HBaseProviderService extends AbstractProviderService implements
-                                                                  ProviderCore,
-                                                                  HBaseKeys,
-    SliderKeys,
-    AgentRestOperations{
+public class HBaseProviderService extends AbstractProviderService 
+    implements ProviderCore, HBaseKeys, SliderKeys, AgentRestOperations{
 
   protected static final Logger log =
     LoggerFactory.getLogger(HBaseProviderService.class);
@@ -109,8 +106,8 @@ public class HBaseProviderService extends 
AbstractProviderService implements
    * @param instanceDefinition the instance definition to validate
    */
   @Override // Client and Server
-  public void validateInstanceDefinition(AggregateConf instanceDefinition) 
throws
-      SliderException {
+  public void validateInstanceDefinition(AggregateConf instanceDefinition) 
+      throws SliderException {
     clientProvider.validateInstanceDefinition(instanceDefinition);
   }
 

Reply via email to