SLIDER-219. Multiple dynamic ports may be asked as part of the same config value


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/dca4818c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/dca4818c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/dca4818c

Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: dca4818c551c8fff655c8898c0786aae2fb0fb6a
Parents: ad2c798
Author: Sumit Mohanty <smoha...@hortonworks.com>
Authored: Fri Aug 1 11:52:35 2014 -0700
Committer: Sumit Mohanty <smoha...@hortonworks.com>
Committed: Tue Aug 5 18:51:58 2014 -0700

----------------------------------------------------------------------
 .../slider/providers/ProviderService.java       |   7 +
 .../providers/agent/AgentProviderService.java   | 329 ++++++++++---------
 .../server/appmaster/SliderAppMaster.java       |   2 +
 .../agent/TestAgentProviderService.java         |  49 ++-
 4 files changed, 224 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/dca4818c/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java 
b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index 56e24e9..b9fa34c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.slider.api.ClusterDescription;
 import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.core.conf.AggregateConf;
@@ -69,6 +70,12 @@ public interface ProviderService extends ProviderCore, 
Service,
       SliderException;
 
   /**
+   * Notify the providers of container completion
+   * @param containerId
+   */
+  void notifyContainerCompleted(ContainerId containerId);
+
+  /**
    * Execute a process in the AM
    * @param instanceDefinition cluster description
    * @param confDir configuration directory

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/dca4818c/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index b39601a..bbe2001 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.slider.api.ClusterDescription;
@@ -110,6 +111,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
   private static final String GLOBAL_CONFIG_TAG = "global";
   private static final String LOG_FOLDERS_TAG = "LogFolders";
   private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
+  private static final String SHARED_PORT_TAG = "{SHARED}";
   private static final int MAX_LOG_ENTRIES = 20;
   private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
   private final Object syncLock = new Object();
@@ -290,116 +292,6 @@ public class AgentProviderService extends 
AbstractProviderService implements
   }
 
   /**
-   * Reads and sets the heartbeat monitoring interval. If bad value is 
provided then log it and set to default.
-   * @param instanceDefinition
-   */
-  private void readAndSetHeartbeatMonitoringInterval(AggregateConf 
instanceDefinition) {
-    String hbMonitorInterval = instanceDefinition.getAppConfOperations().
-        getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
-                                     
Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
-    try {
-      setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
-    }catch (NumberFormatException e) {
-      log.warn(
-          "Bad value {} for {}. Defaulting to ",
-          hbMonitorInterval,
-          HEARTBEAT_MONITOR_INTERVAL,
-          DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
-    }
-  }
-
-  /**
-   * Reads and sets the heartbeat monitoring interval. If bad value is 
provided then log it and set to default.
-   * @param instanceDefinition
-   */
-  private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
-    String launchParameterStr = instanceDefinition.getAppConfOperations().
-        getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
-    agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
-  }
-
-  @VisibleForTesting
-  protected Metainfo getMetainfo() {
-    return this.metainfo;
-  }
-
-  @VisibleForTesting
-  protected Map<String, ComponentInstanceState> getComponentStatuses() {
-    return componentStatuses;
-  }
-
-  @VisibleForTesting
-  protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
-                                            String appDef) throws IOException {
-    return AgentUtils.getApplicationMetainfo(fileSystem, appDef);
-  }
-
-  @VisibleForTesting
-  protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) {
-    this.heartbeatMonitorInterval = heartbeatMonitorInterval;
-  }
-
-  private int getHeartbeatMonitorInterval() {
-    return this.heartbeatMonitorInterval;
-  }
-
-  /**
-   * Publish a named config bag that may contain name-value pairs for app 
configurations such as hbase-site
-   * @param name
-   * @param description
-   * @param entries
-   */
-  protected void publishComponentConfiguration(String name, String description,
-                                               Iterable<Map.Entry<String, 
String>> entries) {
-    PublishedConfiguration pubconf = new PublishedConfiguration();
-    pubconf.description = description;
-    pubconf.putValues(entries);
-    log.info("publishing {}", pubconf);
-    getAmState().getPublishedSliderConfigurations().put(name, pubconf);
-  }
-
-  /**
-   * Get a list of all hosts for all role/container per role
-   * @return
-   */
-  protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
-    amState.refreshClusterStatus();
-    return (Map<String, Map<String, ClusterNode>>)
-        amState.getClusterStatus().status.get(
-            ClusterDescriptionKeys.KEY_CLUSTER_LIVE);
-  }
-
-  private String getContainerLabel(Container container, String role) {
-    return container.getId().toString() + LABEL_MAKER + role;
-  }
-
-  protected String getClusterInfoPropertyValue(String name) {
-    StateAccessForProviders accessor = getAmState();
-    assert accessor.isApplicationLive();
-    ClusterDescription description = accessor.getClusterStatus();
-    return description.getInfo(name);
-  }
-
-  /**
-   * Lost heartbeat from the container - release it and ask for a replacement
-   *
-   * @param label
-   *
-   * @return if release is requested successfully
-   */
-  protected boolean releaseContainer(String label) {
-    componentStatuses.remove(label);
-    try {
-      getAppMaster().refreshContainer(getContainerId(label), true);
-    } catch (SliderException e) {
-      log.info("Error while requesting container release for {}. Message: {}", 
label, e.getMessage());
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
    * Run this service
    *
    * @param instanceDefinition component description
@@ -421,16 +313,6 @@ public class AgentProviderService extends 
AbstractProviderService implements
     return false;
   }
 
-  /**
-   * Build the provider status, can be empty
-   *
-   * @return the provider status - map of entries to add to the info section
-   */
-  public Map<String, String> buildProviderStatus() {
-    Map<String, String> stats = new HashMap<>();
-    return stats;
-  }
-
   @Override
   public boolean isSupportedRole(String role) {
     return true;
@@ -489,7 +371,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
     // Otherwise, wait till the master that can publish is ready
     if (isMaster &&
         (canAnyMasterPublishConfig() == false || canPublishConfig(roleName))) {
-      processReturnedStatus(heartBeat, componentStatus);
+      publishConfigAndExportGroups(heartBeat, componentStatus);
     }
 
     List<CommandReport> reports = heartBeat.getReports();
@@ -511,7 +393,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
       log.info("Component operation. Status: {}", result);
 
       if (command == Command.INSTALL && report.getFolders() != null && 
report.getFolders().size() > 0) {
-        processFolderPaths(report.getFolders(), containerId, 
heartBeat.getFqdn());
+        publishLogFolderPaths(report.getFolders(), containerId, 
heartBeat.getFqdn());
       }
     }
 
@@ -558,13 +440,168 @@ public class AgentProviderService extends 
AbstractProviderService implements
     return response;
   }
 
+  @Override
+  public Map<String, String> buildMonitorDetails(ClusterDescription 
clusterDesc) {
+    Map<String, String> details = super.buildMonitorDetails(clusterDesc);
+    buildRoleHostDetails(details);
+    return details;
+  }
+
+  @Override
+  public void applyInitialRegistryDefinitions(URL unsecureWebAPI,
+                                              URL secureWebAPI,
+                                              ServiceInstanceData 
instanceData) throws IOException {
+    super.applyInitialRegistryDefinitions(unsecureWebAPI,
+                                          secureWebAPI,
+                                          instanceData
+    );
+
+    try {
+      instanceData.internalView.endpoints.put(
+          CustomRegistryConstants.AGENT_REST_API,
+          new RegisteredEndpoint(
+              new URL(secureWebAPI, SLIDER_PATH_AGENTS),
+              "Agent REST API"));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void notifyContainerCompleted(ContainerId containerId) {
+    if(containerId != null) {
+      String containerIdStr = containerId.toString();
+    }
+  }
+
   /**
-   * Format the folder locations before publishing in the registry service
+   * Reads and sets the heartbeat monitoring interval. If bad value is 
provided then log it and set to default.
+   * @param instanceDefinition
+   */
+  private void readAndSetHeartbeatMonitoringInterval(AggregateConf 
instanceDefinition) {
+    String hbMonitorInterval = instanceDefinition.getAppConfOperations().
+        getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
+                                     
Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
+    try {
+      setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
+    }catch (NumberFormatException e) {
+      log.warn(
+          "Bad value {} for {}. Defaulting to ",
+          hbMonitorInterval,
+          HEARTBEAT_MONITOR_INTERVAL,
+          DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
+    }
+  }
+
+  /**
+   * Reads and sets the heartbeat monitoring interval. If bad value is 
provided then log it and set to default.
+   * @param instanceDefinition
+   */
+  private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
+    String launchParameterStr = instanceDefinition.getAppConfOperations().
+        getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
+    agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
+  }
+
+  @VisibleForTesting
+  protected Metainfo getMetainfo() {
+    return this.metainfo;
+  }
+
+  @VisibleForTesting
+  protected Map<String, ComponentInstanceState> getComponentStatuses() {
+    return componentStatuses;
+  }
+
+  @VisibleForTesting
+  protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
+                                            String appDef) throws IOException {
+    return AgentUtils.getApplicationMetainfo(fileSystem, appDef);
+  }
+
+  @VisibleForTesting
+  protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) {
+    this.heartbeatMonitorInterval = heartbeatMonitorInterval;
+  }
+
+  private int getHeartbeatMonitorInterval() {
+    return this.heartbeatMonitorInterval;
+  }
+
+  /**
+   * Publish a named property bag that may contain name-value pairs for app 
configurations such as hbase-site
+   * @param name
+   * @param description
+   * @param entries
+   */
+  protected void publishComponentConfiguration(String name, String description,
+                                               Iterable<Map.Entry<String, 
String>> entries) {
+    PublishedConfiguration pubconf = new PublishedConfiguration();
+    pubconf.description = description;
+    pubconf.putValues(entries);
+    log.info("publishing {}", pubconf);
+    getAmState().getPublishedSliderConfigurations().put(name, pubconf);
+  }
+
+  /**
+   * Get a list of all hosts for all role/container per role
+   * @return
+   */
+  protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
+    amState.refreshClusterStatus();
+    return (Map<String, Map<String, ClusterNode>>)
+        amState.getClusterStatus().status.get(
+            ClusterDescriptionKeys.KEY_CLUSTER_LIVE);
+  }
+
+  private String getContainerLabel(Container container, String role) {
+    return container.getId().toString() + LABEL_MAKER + role;
+  }
+
+  protected String getClusterInfoPropertyValue(String name) {
+    StateAccessForProviders accessor = getAmState();
+    assert accessor.isApplicationLive();
+    ClusterDescription description = accessor.getClusterStatus();
+    return description.getInfo(name);
+  }
+
+  /**
+   * Lost heartbeat from the container - release it and ask for a replacement
+   *
+   * @param label
+   *
+   * @return if release is requested successfully
+   */
+  protected boolean releaseContainer(String label) {
+    componentStatuses.remove(label);
+    try {
+      getAppMaster().refreshContainer(getContainerId(label), true);
+    } catch (SliderException e) {
+      log.info("Error while requesting container release for {}. Message: {}", 
label, e.getMessage());
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Build the provider status, can be empty
+   *
+   * @return the provider status - map of entries to add to the info section
+   */
+  public Map<String, String> buildProviderStatus() {
+    Map<String, String> stats = new HashMap<>();
+    return stats;
+  }
+
+
+  /**
+   * Format the folder locations and publish in the registry service
    * @param folders
    * @param containerId
    * @param hostFqdn
    */
-  private void processFolderPaths(Map<String, String> folders, String 
containerId, String hostFqdn) {
+  private void publishLogFolderPaths(Map<String, String> folders, String 
containerId, String hostFqdn) {
     for (String key : folders.keySet()) {
       workFolders.put(String.format("%s-%s-%s", hostFqdn, containerId, key), 
folders.get(key));
     }
@@ -578,7 +615,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
    * @param heartBeat
    * @param componentStatus
    */
-  protected void processReturnedStatus(HeartBeat heartBeat, 
ComponentInstanceState componentStatus) {
+  protected void publishConfigAndExportGroups(HeartBeat heartBeat, 
ComponentInstanceState componentStatus) {
     List<ComponentStatus> statuses = heartBeat.getComponentStatus();
     if (statuses != null && !statuses.isEmpty()) {
       log.info("Processing {} status reports.", statuses.size());
@@ -658,11 +695,11 @@ public class AgentProviderService extends 
AbstractProviderService implements
               boolean publishData = false;
               String portValPattern = String.format(portVarFormat, portName);
               if(templateToExport.contains(portValPattern)) {
-                templateToExport.replace(portValPattern, ports.get(portName));
+                templateToExport = templateToExport.replace(portValPattern, 
ports.get(portName));
                 publishData = true;
               }
               if(templateToExport.contains(hostNamePattern)) {
-                templateToExport.replace(hostNamePattern, hostFqdn);
+                templateToExport = templateToExport.replace(hostNamePattern, 
hostFqdn);
                 publishData = true;
               }
               if(publishData) {
@@ -1032,9 +1069,13 @@ public class AgentProviderService extends 
AbstractProviderService implements
     //apply any port updates
     if (!this.getAllocatedPorts().isEmpty()) {
       for (String key : config.keySet()) {
-        String lookupKey = configName + "." + key;
-        if (this.getAllocatedPorts().containsKey(lookupKey)) {
-          config.put(key, getAllocatedPorts().get(lookupKey));
+        String value = config.get(key);
+        if(!value.contains(SHARED_PORT_TAG)) {
+          // If the config property is not shared then do not pass on the 
already allocated value
+          String lookupKey = configName + "." + key;
+          if (this.getAllocatedPorts().containsKey(lookupKey)) {
+            config.put(key, getAllocatedPorts().get(lookupKey));
+          }
         }
       }
     }
@@ -1065,13 +1106,6 @@ public class AgentProviderService extends 
AbstractProviderService implements
     config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install");
   }
 
-  @Override
-  public Map<String, String> buildMonitorDetails(ClusterDescription 
clusterDesc) {
-    Map<String, String> details = super.buildMonitorDetails(clusterDesc);
-    buildRoleHostDetails(details);
-    return details;
-  }
-
   private void buildRoleHostDetails(Map<String, String> details) {
     for (Map.Entry<String, Map<String, ClusterNode>> entry :
         getRoleClusterNodeMapping().entrySet()) {
@@ -1080,25 +1114,4 @@ public class AgentProviderService extends 
AbstractProviderService implements
                   "");
     }
   }
-
-  @Override
-  public void applyInitialRegistryDefinitions(URL unsecureWebAPI,
-                                              URL secureWebAPI,
-                                              ServiceInstanceData 
instanceData) throws IOException {
-    super.applyInitialRegistryDefinitions(unsecureWebAPI,
-                                          secureWebAPI,
-                                          instanceData
-    );
-
-    try {
-      instanceData.internalView.endpoints.put(
-          CustomRegistryConstants.AGENT_REST_API,
-          new RegisteredEndpoint(
-              new URL(secureWebAPI, SLIDER_PATH_AGENTS),
-              "Agent REST API"));
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/dca4818c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 7e0ae5e..a2dc36f 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1061,6 +1061,8 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
         RoleInstance ri = result.roleInstance;
         log.error("Role instance {} failed ", ri);
       }
+
+      getProviderService().notifyContainerCompleted(containerId);
     }
 
     // ask for more containers if any failed

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/dca4818c/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 4d63263..1f9cb41 100644
--- 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -355,6 +355,45 @@ public class TestAgentProviderService {
   }
 
   @Test
+  public void testComponentSpecificPublishes() throws Exception {
+    InputStream metainfo_1 = new 
ByteArrayInputStream(metainfo_1_str.getBytes());
+    Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
+    AgentProviderService aps = new AgentProviderService();
+    AgentProviderService mockAps = Mockito.spy(aps);
+    doNothing().when(mockAps).publishComponentConfiguration(anyString(), 
anyString(), anyCollection());
+    doReturn(metainfo).when(mockAps).getMetainfo();
+
+    Map<String, String> ports = new HashMap<>();
+    ports.put("global.listen_port", "10010");
+    mockAps.processComponentSpecificPublishes(ports,
+                                              "cid1",
+                                              "host1",
+                                              "HBASE_REGIONSERVER");
+    ArgumentCaptor<Collection> entriesCaptor = ArgumentCaptor.
+        forClass(Collection.class);
+    ArgumentCaptor<String> publishNameCaptor = ArgumentCaptor.
+        forClass(String.class);
+    Mockito.verify(mockAps, Mockito.times(1)).publishComponentConfiguration(
+        anyString(),
+        publishNameCaptor.capture(),
+        entriesCaptor.capture());
+    assert entriesCaptor.getAllValues().size() == 1;
+    for (Collection coll : entriesCaptor.getAllValues()) {
+      Set<Map.Entry<String, String>> entrySet = (Set<Map.Entry<String, 
String>>) coll;
+      for (Map.Entry entry : entrySet) {
+        log.info("{}:{}", entry.getKey(), entry.getValue().toString());
+        if (entry.getKey().equals("PropertyA")) {
+          assert entry.getValue().toString().equals("host1:10010");
+        }
+      }
+    }
+    assert publishNameCaptor.getAllValues().size() == 1;
+    for (String coll : publishNameCaptor.getAllValues()) {
+      assert coll.equals("ComponentInstanceData");
+    }
+  }
+
+  @Test
   public void testProcessConfig() throws Exception {
     InputStream metainfo_1 = new 
ByteArrayInputStream(metainfo_1_str.getBytes());
     Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
@@ -387,16 +426,16 @@ public class TestAgentProviderService {
     doReturn(metainfo).when(mockAps).getMetainfo();
     doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
 
-    mockAps.processReturnedStatus(hb, componentStatus);
+    mockAps.publishConfigAndExportGroups(hb, componentStatus);
     assert componentStatus.getConfigReported() == true;
-    ArgumentCaptor<Collection> commandCaptor = ArgumentCaptor.
+    ArgumentCaptor<Collection> entriesCaptor = ArgumentCaptor.
         forClass(Collection.class);
     Mockito.verify(mockAps, Mockito.times(3)).publishComponentConfiguration(
         anyString(),
         anyString(),
-        commandCaptor.capture());
-    assert commandCaptor.getAllValues().size() == 3;
-    for (Collection coll : commandCaptor.getAllValues()) {
+        entriesCaptor.capture());
+    assert entriesCaptor.getAllValues().size() == 3;
+    for (Collection coll : entriesCaptor.getAllValues()) {
       Set<Map.Entry<String, String>> entrySet = (Set<Map.Entry<String, 
String>>) coll;
       for (Map.Entry entry : entrySet) {
         log.info("{}:{}", entry.getKey(), entry.getValue().toString());

Reply via email to